1
0
Fork 0
mirror of https://github.com/Zedfrigg/ironbar.git synced 2025-07-02 03:01:04 +02:00

refactor: move various clients to own folder

This commit is contained in:
Jake Stanger 2022-11-06 23:38:51 +00:00
parent 94693c92e3
commit 4662f60ac5
No known key found for this signature in database
GPG key ID: C51FC8F9CB0BEA61
16 changed files with 21 additions and 25 deletions

4
src/clients/mod.rs Normal file
View file

@ -0,0 +1,4 @@
pub mod mpd;
pub mod sway;
pub mod system_tray;
pub mod wayland;

167
src/clients/mpd.rs Normal file
View file

@ -0,0 +1,167 @@
use lazy_static::lazy_static;
use mpd_client::client::{CommandError, Connection, ConnectionEvent, Subsystem};
use mpd_client::commands::Command;
use mpd_client::protocol::MpdProtocolError;
use mpd_client::responses::Status;
use mpd_client::Client;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::os::unix::fs::FileTypeExt;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpStream, UnixStream};
use tokio::spawn;
use tokio::sync::broadcast::{channel, error::SendError, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::time::sleep;
use tracing::debug;
lazy_static! {
static ref CONNECTIONS: Arc<Mutex<HashMap<String, Arc<MpdClient>>>> =
Arc::new(Mutex::new(HashMap::new()));
}
pub struct MpdClient {
client: Client,
tx: Sender<()>,
_rx: Receiver<()>,
}
#[derive(Debug)]
pub enum MpdConnectionError {
MaxRetries,
ProtocolError(MpdProtocolError),
}
impl Display for MpdConnectionError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::MaxRetries => write!(f, "Reached max retries"),
Self::ProtocolError(e) => write!(f, "{:?}", e),
}
}
}
impl std::error::Error for MpdConnectionError {}
impl MpdClient {
async fn new(host: &str) -> Result<Self, MpdConnectionError> {
debug!("Creating new MPD connection to {}", host);
let (client, mut state_changes) =
wait_for_connection(host, Duration::from_secs(5), None).await?;
let (tx, rx) = channel(16);
let tx2 = tx.clone();
spawn(async move {
while let Some(change) = state_changes.next().await {
debug!("Received state change: {:?}", change);
if let ConnectionEvent::SubsystemChange(
Subsystem::Player | Subsystem::Queue | Subsystem::Mixer,
) = change
{
tx2.send(())?;
}
}
Ok::<(), SendError<()>>(())
});
Ok(Self {
client,
tx,
_rx: rx,
})
}
pub fn subscribe(&self) -> Receiver<()> {
self.tx.subscribe()
}
pub async fn command<C: Command>(&self, command: C) -> Result<C::Response, CommandError> {
self.client.command(command).await
}
}
pub async fn get_client(host: &str) -> Result<Arc<MpdClient>, MpdConnectionError> {
let mut connections = CONNECTIONS.lock().await;
match connections.get(host) {
None => {
let client = MpdClient::new(host).await?;
let client = Arc::new(client);
connections.insert(host.to_string(), Arc::clone(&client));
Ok(client)
}
Some(client) => Ok(Arc::clone(client)),
}
}
async fn wait_for_connection(
host: &str,
interval: Duration,
max_retries: Option<usize>,
) -> Result<Connection, MpdConnectionError> {
let mut retries = 0;
let max_retries = max_retries.unwrap_or(usize::MAX);
loop {
if retries == max_retries {
break Err(MpdConnectionError::MaxRetries);
}
retries += 1;
match try_get_mpd_conn(host).await {
Ok(conn) => break Ok(conn),
Err(err) => {
if retries == max_retries {
break Err(MpdConnectionError::ProtocolError(err));
}
}
}
sleep(interval).await;
}
}
/// Cycles through each MPD host and
/// returns the first one which connects,
/// or none if there are none
async fn try_get_mpd_conn(host: &str) -> Result<Connection, MpdProtocolError> {
if is_unix_socket(host) {
connect_unix(host).await
} else {
connect_tcp(host).await
}
}
fn is_unix_socket(host: &str) -> bool {
let path = PathBuf::from(host);
path.exists()
&& path
.metadata()
.map_or(false, |metadata| metadata.file_type().is_socket())
}
async fn connect_unix(host: &str) -> Result<Connection, MpdProtocolError> {
let connection = UnixStream::connect(host).await?;
Client::connect(connection).await
}
async fn connect_tcp(host: &str) -> Result<Connection, MpdProtocolError> {
let connection = TcpStream::connect(host).await?;
Client::connect(connection).await
}
/// Gets the duration of the current song
pub fn get_duration(status: &Status) -> Option<u64> {
status.duration.map(|duration| duration.as_secs())
}
/// Gets the elapsed time of the current song
pub fn get_elapsed(status: &Status) -> Option<u64> {
status.elapsed.map(|duration| duration.as_secs())
}

74
src/clients/sway.rs Normal file
View file

@ -0,0 +1,74 @@
use async_once::AsyncOnce;
use color_eyre::Report;
use futures_util::StreamExt;
use lazy_static::lazy_static;
use std::sync::Arc;
use swayipc_async::{Connection, Event, EventType, WorkspaceEvent};
use tokio::spawn;
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tracing::{info, trace};
pub struct SwayEventClient {
workspace_tx: Sender<Box<WorkspaceEvent>>,
_workspace_rx: Receiver<Box<WorkspaceEvent>>,
}
impl SwayEventClient {
fn new() -> Self {
let (workspace_tx, workspace_rx) = channel(16);
let workspace_tx2 = workspace_tx.clone();
spawn(async move {
let workspace_tx = workspace_tx2;
let client = Connection::new().await?;
info!("Sway IPC subscription client connected");
let event_types = [EventType::Workspace];
let mut events = client.subscribe(event_types).await?;
while let Some(event) = events.next().await {
trace!("event: {:?}", event);
if let Event::Workspace(ev) = event? {
workspace_tx.send(ev)?;
};
}
Ok::<(), Report>(())
});
Self {
workspace_tx,
_workspace_rx: workspace_rx,
}
}
/// Gets an event receiver for workspace events
pub fn subscribe_workspace(&self) -> Receiver<Box<WorkspaceEvent>> {
self.workspace_tx.subscribe()
}
}
lazy_static! {
static ref CLIENT: AsyncOnce<Arc<Mutex<Connection>>> = AsyncOnce::new(async {
let client = Connection::new()
.await
.expect("Failed to connect to Sway socket");
Arc::new(Mutex::new(client))
});
static ref SUB_CLIENT: SwayEventClient = SwayEventClient::new();
}
/// Gets the sway IPC client
pub async fn get_client() -> Arc<Mutex<Connection>> {
let client = CLIENT.get().await;
Arc::clone(client)
}
/// Gets the sway IPC event subscription client
pub fn get_sub_client() -> &'static SwayEventClient {
&SUB_CLIENT
}

View file

@ -0,0 +1,74 @@
use async_once::AsyncOnce;
use lazy_static::lazy_static;
use stray::message::{NotifierItemCommand, NotifierItemMessage};
use stray::StatusNotifierWatcher;
use tokio::spawn;
use tokio::sync::{broadcast, mpsc};
use tracing::debug;
pub struct TrayEventReceiver {
tx: mpsc::Sender<NotifierItemCommand>,
b_tx: broadcast::Sender<NotifierItemMessage>,
_b_rx: broadcast::Receiver<NotifierItemMessage>,
}
impl TrayEventReceiver {
async fn new() -> stray::error::Result<Self> {
let (tx, rx) = mpsc::channel(16);
let (b_tx, b_rx) = broadcast::channel(16);
let tray = StatusNotifierWatcher::new(rx).await?;
let mut host = tray.create_notifier_host("ironbar").await?;
let b_tx2 = b_tx.clone();
spawn(async move {
while let Ok(message) = host.recv().await {
b_tx2.send(message)?;
}
Ok::<(), broadcast::error::SendError<NotifierItemMessage>>(())
});
Ok(Self {
tx,
b_tx,
_b_rx: b_rx,
})
}
pub fn subscribe(
&self,
) -> (
mpsc::Sender<NotifierItemCommand>,
broadcast::Receiver<NotifierItemMessage>,
) {
(self.tx.clone(), self.b_tx.subscribe())
}
}
lazy_static! {
static ref CLIENT: AsyncOnce<TrayEventReceiver> = AsyncOnce::new(async {
const MAX_RETRIES: i32 = 10;
// sometimes this can fail
let mut retries = 0;
let value = loop {
retries += 1;
let tray = TrayEventReceiver::new().await;
if tray.is_ok() || retries == MAX_RETRIES {
break tray;
}
debug!("Failed to create StatusNotifierWatcher (attempt {retries})");
};
value.expect("Failed to create StatusNotifierWatcher")
});
}
pub async fn get_tray_event_client() -> &'static TrayEventReceiver {
CLIENT.get().await
}

View file

@ -0,0 +1,129 @@
use super::toplevel::{ToplevelEvent, ToplevelInfo};
use super::toplevel_manager::listen_for_toplevels;
use super::ToplevelChange;
use super::{Env, ToplevelHandler};
use color_eyre::Report;
use indexmap::IndexMap;
use smithay_client_toolkit::environment::Environment;
use smithay_client_toolkit::output::{with_output_info, OutputInfo};
use smithay_client_toolkit::reexports::calloop;
use smithay_client_toolkit::{new_default_environment, WaylandSource};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tokio::sync::{broadcast, oneshot};
use tokio::task::spawn_blocking;
use tracing::{error, trace};
use wayland_client::protocol::wl_seat::WlSeat;
use wayland_protocols::wlr::unstable::foreign_toplevel::v1::client::{
zwlr_foreign_toplevel_handle_v1::ZwlrForeignToplevelHandleV1,
zwlr_foreign_toplevel_manager_v1::ZwlrForeignToplevelManagerV1,
};
pub struct WaylandClient {
pub outputs: Vec<OutputInfo>,
pub seats: Vec<WlSeat>,
pub toplevels: Arc<RwLock<IndexMap<usize, (ToplevelInfo, ZwlrForeignToplevelHandleV1)>>>,
toplevel_tx: broadcast::Sender<ToplevelEvent>,
_toplevel_rx: broadcast::Receiver<ToplevelEvent>,
}
impl WaylandClient {
pub(super) async fn new() -> Self {
let (output_tx, output_rx) = oneshot::channel();
let (seat_tx, seat_rx) = oneshot::channel();
let (toplevel_tx, toplevel_rx) = broadcast::channel(32);
let toplevel_tx2 = toplevel_tx.clone();
let toplevels = Arc::new(RwLock::new(IndexMap::new()));
let toplevels2 = toplevels.clone();
// `queue` is not send so we need to handle everything inside the task
spawn_blocking(move || {
let (env, _display, queue) =
new_default_environment!(Env, fields = [toplevel: ToplevelHandler::init()])
.expect("Failed to connect to Wayland compositor");
let outputs = Self::get_outputs(&env);
output_tx
.send(outputs)
.expect("Failed to send outputs out of task");
let seats = env.get_all_seats();
seat_tx
.send(
seats
.into_iter()
.map(|seat| seat.detach())
.collect::<Vec<WlSeat>>(),
)
.expect("Failed to send seats out of task");
let _toplevel_manager = env.require_global::<ZwlrForeignToplevelManagerV1>();
let _listener = listen_for_toplevels(env, move |handle, event, _ddata| {
trace!("Received toplevel event: {:?}", event);
if event.change == ToplevelChange::Close {
toplevels2
.write()
.expect("Failed to get write lock on toplevels")
.remove(&event.toplevel.id);
} else {
toplevels2
.write()
.expect("Failed to get write lock on toplevels")
.insert(event.toplevel.id, (event.toplevel.clone(), handle));
}
toplevel_tx2
.send(event)
.expect("Failed to send toplevel event");
});
let mut event_loop =
calloop::EventLoop::<()>::try_new().expect("Failed to create new event loop");
WaylandSource::new(queue)
.quick_insert(event_loop.handle())
.expect("Failed to insert event loop into wayland event queue");
loop {
// TODO: Avoid need for duration here - can we force some event when sending requests?
if let Err(err) = event_loop.dispatch(Duration::from_millis(50), &mut ()) {
error!(
"{:?}",
Report::new(err).wrap_err("Failed to dispatch pending wayland events")
);
}
}
});
let outputs = output_rx
.await
.expect("Failed to receive outputs from task");
let seats = seat_rx.await.expect("Failed to receive seats from task");
Self {
outputs,
seats,
toplevels,
toplevel_tx,
_toplevel_rx: toplevel_rx,
}
}
pub fn subscribe_toplevels(&self) -> broadcast::Receiver<ToplevelEvent> {
self.toplevel_tx.subscribe()
}
fn get_outputs(env: &Environment<Env>) -> Vec<OutputInfo> {
let outputs = env.get_all_outputs();
outputs
.iter()
.filter_map(|output| with_output_info(output, Clone::clone))
.collect()
}
}

View file

@ -0,0 +1,53 @@
mod client;
mod toplevel;
mod toplevel_manager;
extern crate smithay_client_toolkit as sctk;
use async_once::AsyncOnce;
use lazy_static::lazy_static;
pub use toplevel::{ToplevelChange, ToplevelEvent, ToplevelInfo};
use toplevel_manager::{ToplevelHandler, ToplevelHandling, ToplevelStatusListener};
use wayland_client::{Attached, DispatchData, Interface};
use wayland_protocols::wlr::unstable::foreign_toplevel::v1::client::{
zwlr_foreign_toplevel_handle_v1::ZwlrForeignToplevelHandleV1,
zwlr_foreign_toplevel_manager_v1::ZwlrForeignToplevelManagerV1,
};
pub use client::WaylandClient;
/// A utility for lazy-loading globals.
/// Taken from `smithay_client_toolkit` where it's not exposed
#[derive(Debug)]
enum LazyGlobal<I: Interface> {
Unknown,
Seen { id: u32, version: u32 },
Bound(Attached<I>),
}
sctk::default_environment!(Env,
fields = [
toplevel: ToplevelHandler
],
singles = [
ZwlrForeignToplevelManagerV1 => toplevel
],
);
impl ToplevelHandling for Env {
fn listen<F>(&mut self, f: F) -> ToplevelStatusListener
where
F: FnMut(ZwlrForeignToplevelHandleV1, ToplevelEvent, DispatchData) + 'static,
{
self.toplevel.listen(f)
}
}
lazy_static! {
static ref CLIENT: AsyncOnce<WaylandClient> =
AsyncOnce::new(async { WaylandClient::new().await });
}
pub async fn get_client() -> &'static WaylandClient {
CLIENT.get().await
}

View file

@ -0,0 +1,151 @@
use std::collections::HashSet;
use std::sync::{Arc, RwLock};
use std::sync::atomic::{AtomicUsize, Ordering};
use tracing::trace;
use wayland_client::{DispatchData, Main};
use wayland_protocols::wlr::unstable::foreign_toplevel::v1::client::zwlr_foreign_toplevel_handle_v1::{Event, ZwlrForeignToplevelHandleV1};
const STATE_ACTIVE: u32 = 2;
const STATE_FULLSCREEN: u32 = 3;
static COUNTER: AtomicUsize = AtomicUsize::new(1);
fn get_id() -> usize {
COUNTER.fetch_add(1, Ordering::Relaxed)
}
#[derive(Debug, Clone, Default)]
pub struct ToplevelInfo {
pub id: usize,
pub app_id: String,
pub title: String,
pub active: bool,
pub fullscreen: bool,
ready: bool,
}
impl ToplevelInfo {
fn new() -> Self {
let id = get_id();
Self {
id,
..Default::default()
}
}
}
pub struct Toplevel;
#[derive(Debug, Clone)]
pub struct ToplevelEvent {
pub toplevel: ToplevelInfo,
pub change: ToplevelChange,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ToplevelChange {
New,
Close,
Title(String),
Focus(bool),
Fullscreen(bool),
}
fn toplevel_implem<F>(event: Event, info: &mut ToplevelInfo, implem: &mut F, ddata: DispatchData)
where
F: FnMut(ToplevelEvent, DispatchData),
{
trace!("event: {event:?} (info: {info:?})");
let change = match event {
Event::AppId { app_id } => {
info.app_id = app_id;
None
}
Event::Title { title } => {
info.title = title.clone();
if info.ready {
Some(ToplevelChange::Title(title))
} else {
None
}
}
Event::State { state } => {
// state is received as a `Vec<u8>` where every 4 bytes make up a `u32`
// the u32 then represents a value in the `State` enum.
assert_eq!(state.len() % 4, 0);
let state = (0..state.len() / 4)
.map(|i| {
let slice: [u8; 4] = state[i * 4..i * 4 + 4]
.try_into()
.expect("Received invalid state length");
u32::from_le_bytes(slice)
})
.collect::<HashSet<_>>();
let new_active = state.contains(&STATE_ACTIVE);
let new_fullscreen = state.contains(&STATE_FULLSCREEN);
let change = if info.ready && new_active != info.active {
Some(ToplevelChange::Focus(new_active))
} else if info.ready && new_fullscreen != info.fullscreen {
Some(ToplevelChange::Fullscreen(new_fullscreen))
} else {
None
};
info.active = new_active;
info.fullscreen = new_fullscreen;
change
}
Event::Closed => {
if info.ready {
Some(ToplevelChange::Close)
} else {
None
}
}
Event::OutputEnter { output: _ } => None,
Event::OutputLeave { output: _ } => None,
Event::Parent { parent: _ } => None,
Event::Done => {
if info.ready || info.app_id.is_empty() {
None
} else {
info.ready = true;
Some(ToplevelChange::New)
}
}
_ => unreachable!(),
};
if let Some(change) = change {
let event = ToplevelEvent {
change,
toplevel: info.clone(),
};
implem(event, ddata);
}
}
impl Toplevel {
pub fn init<F>(handle: &Main<ZwlrForeignToplevelHandleV1>, mut callback: F) -> Self
where
F: FnMut(ToplevelEvent, DispatchData) + 'static,
{
let inner = Arc::new(RwLock::new(ToplevelInfo::new()));
handle.quick_assign(move |_handle, event, ddata| {
let mut inner = inner
.write()
.expect("Failed to get write lock on toplevel inner state");
toplevel_implem(event, &mut inner, &mut callback, ddata);
});
Self
}
}

View file

@ -0,0 +1,164 @@
use super::toplevel::{Toplevel, ToplevelEvent};
use super::LazyGlobal;
use smithay_client_toolkit::environment::{Environment, GlobalHandler};
use std::cell::RefCell;
use std::rc;
use std::rc::Rc;
use tracing::warn;
use wayland_client::protocol::wl_registry::WlRegistry;
use wayland_client::{Attached, DispatchData};
use wayland_protocols::wlr::unstable::foreign_toplevel::v1::client::{
zwlr_foreign_toplevel_handle_v1::ZwlrForeignToplevelHandleV1,
zwlr_foreign_toplevel_manager_v1::{self, ZwlrForeignToplevelManagerV1},
};
struct ToplevelHandlerInner {
manager: LazyGlobal<ZwlrForeignToplevelManagerV1>,
registry: Option<Attached<WlRegistry>>,
toplevels: Vec<Toplevel>,
}
impl ToplevelHandlerInner {
const fn new() -> Self {
let toplevels = vec![];
Self {
registry: None,
manager: LazyGlobal::Unknown,
toplevels,
}
}
}
pub struct ToplevelHandler {
inner: Rc<RefCell<ToplevelHandlerInner>>,
status_listeners: Rc<RefCell<Vec<rc::Weak<RefCell<ToplevelStatusCallback>>>>>,
}
impl ToplevelHandler {
pub fn init() -> Self {
let inner = Rc::new(RefCell::new(ToplevelHandlerInner::new()));
Self {
inner,
status_listeners: Rc::new(RefCell::new(Vec::new())),
}
}
}
impl GlobalHandler<ZwlrForeignToplevelManagerV1> for ToplevelHandler {
fn created(
&mut self,
registry: Attached<WlRegistry>,
id: u32,
version: u32,
_ddata: DispatchData,
) {
let mut inner = RefCell::borrow_mut(&self.inner);
if inner.registry.is_none() {
inner.registry = Some(registry);
}
if let LazyGlobal::Unknown = inner.manager {
inner.manager = LazyGlobal::Seen { id, version }
} else {
warn!(
"Compositor advertised zwlr_foreign_toplevel_manager_v1 multiple times, ignoring."
);
}
}
fn get(&self) -> Option<Attached<ZwlrForeignToplevelManagerV1>> {
let mut inner = RefCell::borrow_mut(&self.inner);
match inner.manager {
LazyGlobal::Bound(ref mgr) => Some(mgr.clone()),
LazyGlobal::Unknown => None,
LazyGlobal::Seen { id, version } => {
let registry = inner.registry.as_ref().expect("Failed to get registry");
// current max protocol version = 3
let version = std::cmp::min(version, 3);
let manager = registry.bind::<ZwlrForeignToplevelManagerV1>(version, id);
{
let inner = self.inner.clone();
let status_listeners = self.status_listeners.clone();
manager.quick_assign(move |_, event, _ddata| {
let mut inner = RefCell::borrow_mut(&inner);
let status_listeners = status_listeners.clone();
match event {
zwlr_foreign_toplevel_manager_v1::Event::Toplevel {
toplevel: handle,
} => {
let toplevel =
Toplevel::init(&handle.clone(), move |event, ddata| {
notify_status_listeners(
&handle,
&event,
ddata,
&status_listeners,
);
});
inner.toplevels.push(toplevel);
}
zwlr_foreign_toplevel_manager_v1::Event::Finished => {}
_ => unreachable!(),
}
});
}
inner.manager = LazyGlobal::Bound((*manager).clone());
Some((*manager).clone())
}
}
}
}
type ToplevelStatusCallback =
dyn FnMut(ZwlrForeignToplevelHandleV1, ToplevelEvent, DispatchData) + 'static;
/// Notifies the callbacks of an event on the toplevel
fn notify_status_listeners(
toplevel: &ZwlrForeignToplevelHandleV1,
event: &ToplevelEvent,
mut ddata: DispatchData,
listeners: &RefCell<Vec<rc::Weak<RefCell<ToplevelStatusCallback>>>>,
) {
listeners.borrow_mut().retain(|lst| {
rc::Weak::upgrade(lst).map_or(false, |cb| {
(cb.borrow_mut())(toplevel.clone(), event.clone(), ddata.reborrow());
true
})
});
}
pub struct ToplevelStatusListener {
_cb: Rc<RefCell<ToplevelStatusCallback>>,
}
pub trait ToplevelHandling {
fn listen<F>(&mut self, f: F) -> ToplevelStatusListener
where
F: FnMut(ZwlrForeignToplevelHandleV1, ToplevelEvent, DispatchData) + 'static;
}
impl ToplevelHandling for ToplevelHandler {
fn listen<F>(&mut self, f: F) -> ToplevelStatusListener
where
F: FnMut(ZwlrForeignToplevelHandleV1, ToplevelEvent, DispatchData) + 'static,
{
let rc = Rc::new(RefCell::new(f)) as Rc<_>;
self.status_listeners.borrow_mut().push(Rc::downgrade(&rc));
ToplevelStatusListener { _cb: rc }
}
}
pub fn listen_for_toplevels<E, F>(env: Environment<E>, f: F) -> ToplevelStatusListener
where
E: ToplevelHandling,
F: FnMut(ZwlrForeignToplevelHandleV1, ToplevelEvent, DispatchData) + 'static,
{
env.with_inner(move |inner| ToplevelHandling::listen(inner, f))
}