diff --git a/src/channels.rs b/src/channels.rs new file mode 100644 index 0000000..69317d5 --- /dev/null +++ b/src/channels.rs @@ -0,0 +1,155 @@ +use crate::modules::ModuleUpdateEvent; +use crate::spawn; +use smithay_client_toolkit::reexports::calloop; +use std::fmt::Debug; +use tokio::sync::{broadcast, mpsc}; + +pub trait SyncSenderExt { + /// Asynchronously sends a message on the channel, + /// panicking if it cannot be sent. + /// + /// This should be used in cases where sending should *never* fail, + /// or where failing indicates a serious bug. + fn send_expect(&self, message: T); +} + +impl SyncSenderExt for std::sync::mpsc::Sender { + #[inline] + fn send_expect(&self, message: T) { + self.send(message).expect(crate::error::ERR_CHANNEL_SEND); + } +} + +impl SyncSenderExt for calloop::channel::Sender { + #[inline] + fn send_expect(&self, message: T) { + self.send(message).expect(crate::error::ERR_CHANNEL_SEND); + } +} + +impl SyncSenderExt for broadcast::Sender { + #[inline] + fn send_expect(&self, message: T) { + self.send(message).expect(crate::error::ERR_CHANNEL_SEND); + } +} + +pub trait AsyncSenderExt: Sync + Send + Sized + Clone { + /// Asynchronously sends a message on the channel, + /// panicking if it cannot be sent. + /// + /// This should be used in cases where sending should *never* fail, + /// or where failing indicates a serious bug. + fn send_expect(&self, message: T) -> impl Future + Send; + + /// Asynchronously sends a message on the channel, + /// spawning a task to allow it to be sent in the background, + /// and panicking if it cannot be sent. + /// + /// Note that this function will return *before* the message is sent. + /// + /// This should be used in cases where sending should *never* fail, + /// or where failing indicates a serious bug. + #[inline] + fn send_spawn(&self, message: T) + where + Self: 'static, + T: Send + 'static, + { + let tx = self.clone(); + spawn(async move { tx.send_expect(message).await }); + } + + /// Shorthand for [`AsyncSenderExt::send_expect`] + /// when sending a [`ModuleUpdateEvent::Update`]. + #[inline] + async fn send_update(&self, update: U) + where + Self: AsyncSenderExt>, + { + self.send_expect(ModuleUpdateEvent::Update(update)).await; + } + + /// Shorthand for [`AsyncSenderExt::send_spawn`] + /// when sending a [`ModuleUpdateEvent::Update`]. + #[inline] + fn send_update_spawn(&self, update: U) + where + Self: AsyncSenderExt> + 'static, + U: Clone + Send + 'static, + { + self.send_spawn(ModuleUpdateEvent::Update(update)); + } +} + +impl AsyncSenderExt for mpsc::Sender { + #[inline] + async fn send_expect(&self, message: T) { + self.send(message) + .await + .expect(crate::error::ERR_CHANNEL_SEND); + } +} + +pub trait MpscReceiverExt { + /// Spawns a `GLib` future on the local thread, and calls `rx.recv()` + /// in a loop, passing the message to `f`. + /// + /// This allows use of `GObjects` and futures in the same context. + fn recv_glib(self, f: F) + where + F: FnMut(T) + 'static; +} + +impl MpscReceiverExt for mpsc::Receiver { + fn recv_glib(mut self, mut f: F) + where + F: FnMut(T) + 'static, + { + glib::spawn_future_local(async move { + while let Some(val) = self.recv().await { + f(val); + } + }); + } +} + +pub trait BroadcastReceiverExt +where + T: Debug + Clone + 'static, +{ + /// Spawns a `GLib` future on the local thread, and calls `rx.recv()` + /// in a loop, passing the message to `f`. + /// + /// This allows use of `GObjects` and futures in the same context. + fn recv_glib(self, f: F) + where + F: FnMut(T) + 'static; +} + +impl BroadcastReceiverExt for broadcast::Receiver +where + T: Debug + Clone + 'static, +{ + fn recv_glib(mut self, mut f: F) + where + F: FnMut(T) + 'static, + { + glib::spawn_future_local(async move { + loop { + match self.recv().await { + Ok(val) => f(val), + Err(broadcast::error::RecvError::Lagged(count)) => { + tracing::warn!( + "Channel lagged behind by {count}, this may result in unexpected or broken behaviour" + ); + } + Err(err) => { + tracing::error!("{err:?}"); + break; + } + } + } + }); + } +} diff --git a/src/clients/clipboard.rs b/src/clients/clipboard.rs index 7444119..64fd853 100644 --- a/src/clients/clipboard.rs +++ b/src/clients/clipboard.rs @@ -1,5 +1,6 @@ use super::wayland::{self, ClipboardItem}; -use crate::{arc_mut, lock, register_client, spawn, try_send}; +use crate::channels::AsyncSenderExt; +use crate::{arc_mut, lock, register_client, spawn}; use indexmap::IndexMap; use indexmap::map::Iter; use std::sync::{Arc, Mutex}; @@ -46,7 +47,7 @@ impl Client { let senders = lock!(senders); let iter = senders.iter(); for (tx, _) in iter { - try_send!(tx, ClipboardEvent::Add(item.clone())); + tx.send_spawn(ClipboardEvent::Add(item.clone())); } lock!(cache).insert(item, senders.len()); @@ -74,16 +75,17 @@ impl Client { let removed_id = lock!(cache) .remove_ref_first() .expect("Clipboard cache unexpectedly empty"); - try_send!(tx, ClipboardEvent::Remove(removed_id)); + + tx.send_spawn(ClipboardEvent::Remove(removed_id)); } - try_send!(tx, ClipboardEvent::Add(item.clone())); + tx.send_spawn(ClipboardEvent::Add(item.clone())); } }, |existing_id| { let senders = lock!(senders); let iter = senders.iter(); for (tx, _) in iter { - try_send!(tx, ClipboardEvent::Activate(existing_id)); + tx.send_spawn(ClipboardEvent::Activate(existing_id)); } }, ); @@ -106,7 +108,7 @@ impl Client { let iter = cache.iter(); for (_, (item, _)) in iter { - try_send!(tx, ClipboardEvent::Add(item.clone())); + tx.send_spawn(ClipboardEvent::Add(item.clone())); } } @@ -130,7 +132,7 @@ impl Client { let senders = lock!(self.senders); let iter = senders.iter(); for (tx, _) in iter { - try_send!(tx, ClipboardEvent::Activate(id)); + tx.send_spawn(ClipboardEvent::Activate(id)); } } @@ -140,7 +142,7 @@ impl Client { let senders = lock!(self.senders); let iter = senders.iter(); for (tx, _) in iter { - try_send!(tx, ClipboardEvent::Remove(id)); + tx.send_spawn(ClipboardEvent::Remove(id)); } } } diff --git a/src/clients/compositor/hyprland.rs b/src/clients/compositor/hyprland.rs index 4745255..7846a0c 100644 --- a/src/clients/compositor/hyprland.rs +++ b/src/clients/compositor/hyprland.rs @@ -3,7 +3,8 @@ use super::{BindModeClient, BindModeUpdate}; #[cfg(feature = "keyboard+hyprland")] use super::{KeyboardLayoutClient, KeyboardLayoutUpdate}; use super::{Visibility, Workspace}; -use crate::{arc_mut, lock, send, spawn_blocking}; +use crate::channels::SyncSenderExt; +use crate::{arc_mut, lock, spawn_blocking}; use color_eyre::Result; use hyprland::ctl::switch_xkb_layout; use hyprland::data::{Devices, Workspace as HWorkspace, Workspaces}; @@ -121,7 +122,7 @@ impl Client { match workspace { Ok(Some(workspace)) => { - send!(tx, WorkspaceUpdate::Add(workspace)); + tx.send_expect(WorkspaceUpdate::Add(workspace)); } Err(e) => error!("Failed to get workspace: {e:#}"), _ => {} @@ -230,13 +231,10 @@ impl Client { let _lock = lock!(lock); debug!("Received workspace rename: {data:?}"); - send!( - tx, - WorkspaceUpdate::Rename { - id: data.id as i64, - name: data.name - } - ); + tx.send_expect(WorkspaceUpdate::Rename { + id: data.id as i64, + name: data.name, + }); }); } @@ -247,7 +245,7 @@ impl Client { event_listener.add_workspace_deleted_handler(move |data| { let _lock = lock!(lock); debug!("Received workspace destroy: {data:?}"); - send!(tx, WorkspaceUpdate::Remove(data.id as i64)); + tx.send_expect(WorkspaceUpdate::Remove(data.id as i64)); }); } @@ -271,13 +269,10 @@ impl Client { error!("Unable to locate client"); }, |c| { - send!( - tx, - WorkspaceUpdate::Urgent { - id: c.workspace.id as i64, - urgent: true, - } - ); + tx.send_expect(WorkspaceUpdate::Urgent { + id: c.workspace.id as i64, + urgent: true, + }); }, ); }); @@ -333,8 +328,7 @@ impl Client { }; debug!("Received layout: {layout:?}"); - - send!(tx, KeyboardLayoutUpdate(layout)); + tx.send_expect(KeyboardLayoutUpdate(layout)); }); } @@ -351,13 +345,10 @@ impl Client { let _lock = lock!(lock); debug!("Received bind mode: {bind_mode:?}"); - send!( - tx, - BindModeUpdate { - name: bind_mode, - pango_markup: false, - } - ); + tx.send_expect(BindModeUpdate { + name: bind_mode, + pango_markup: false, + }); }); } @@ -369,21 +360,15 @@ impl Client { workspace: Workspace, tx: &Sender, ) { - send!( - tx, - WorkspaceUpdate::Focus { - old: prev_workspace.take(), - new: workspace.clone(), - } - ); + tx.send_expect(WorkspaceUpdate::Focus { + old: prev_workspace.take(), + new: workspace.clone(), + }); - send!( - tx, - WorkspaceUpdate::Urgent { - id: workspace.id, - urgent: false, - } - ); + tx.send_expect(WorkspaceUpdate::Urgent { + id: workspace.id, + urgent: false, + }); prev_workspace.replace(workspace); } @@ -439,7 +424,9 @@ impl super::WorkspaceClient for Client { }) .collect(); - send!(self.workspace.tx, WorkspaceUpdate::Init(workspaces)); + self.workspace + .tx + .send_expect(WorkspaceUpdate::Init(workspaces)); } Err(e) => { error!("Failed to get workspaces: {e:#}"); @@ -486,7 +473,9 @@ impl KeyboardLayoutClient for Client { .map(|k| k.active_keymap.clone()) }) { Ok(Some(layout)) => { - send!(self.keyboard_layout.tx, KeyboardLayoutUpdate(layout)); + self.keyboard_layout + .tx + .send_expect(KeyboardLayoutUpdate(layout)); } Ok(None) => error!("Failed to get current keyboard layout hyprland"), Err(err) => error!("Failed to get devices: {err:#?}"), diff --git a/src/clients/compositor/niri/mod.rs b/src/clients/compositor/niri/mod.rs index f05a41e..130ec61 100644 --- a/src/clients/compositor/niri/mod.rs +++ b/src/clients/compositor/niri/mod.rs @@ -1,4 +1,4 @@ -use crate::{clients::compositor::Visibility, send, spawn}; +use crate::{clients::compositor::Visibility, spawn}; use color_eyre::Report; use tracing::{error, warn}; @@ -7,6 +7,7 @@ use tokio::sync::broadcast; use super::{Workspace as IronWorkspace, WorkspaceClient, WorkspaceUpdate}; mod connection; +use crate::channels::SyncSenderExt; use connection::{Action, Connection, Event, Request, WorkspaceReferenceArg}; #[derive(Debug)] @@ -151,7 +152,7 @@ impl Client { }; for event in events { - send!(tx, event); + tx.send_expect(event); } } diff --git a/src/clients/compositor/sway.rs b/src/clients/compositor/sway.rs index ba984c6..3fcb1f3 100644 --- a/src/clients/compositor/sway.rs +++ b/src/clients/compositor/sway.rs @@ -1,6 +1,7 @@ use super::{Visibility, Workspace}; +use crate::channels::SyncSenderExt; use crate::clients::sway::Client; -use crate::{await_sync, error, send, spawn}; +use crate::{await_sync, error, spawn}; use color_eyre::Report; use swayipc_async::{InputChange, InputEvent, Node, WorkspaceChange, WorkspaceEvent}; use tokio::sync::broadcast::{Receiver, channel}; @@ -8,7 +9,7 @@ use tokio::sync::broadcast::{Receiver, channel}; #[cfg(feature = "workspaces")] use super::WorkspaceUpdate; -#[cfg(feature = "workspaces")] +#[cfg(feature = "workspaces+sway")] impl super::WorkspaceClient for Client { fn focus(&self, id: i64) { let client = self.connection().clone(); @@ -49,13 +50,13 @@ impl super::WorkspaceClient for Client { let event = WorkspaceUpdate::Init(workspaces.into_iter().map(Workspace::from).collect()); - send!(tx, event); + tx.send_expect(event); drop(client); self.add_listener::(move |event| { let update = WorkspaceUpdate::from(event.clone()); - send!(tx, update); + tx.send_expect(update); }) .await .expect("to add listener"); @@ -198,7 +199,7 @@ impl KeyboardLayoutClient for Client { let inputs = client.get_inputs().await.expect("to get inputs"); if let Some(layout) = inputs.into_iter().find_map(|i| i.xkb_active_layout_name) { - send!(tx, KeyboardLayoutUpdate(layout)); + tx.send_expect(KeyboardLayoutUpdate(layout)); } else { error!("Failed to get keyboard layout from Sway!"); } @@ -207,7 +208,7 @@ impl KeyboardLayoutClient for Client { self.add_listener::(move |event| { if let Ok(layout) = KeyboardLayoutUpdate::try_from(event.clone()) { - send!(tx, layout); + tx.send_expect(layout); } }) .await @@ -255,13 +256,10 @@ impl BindModeClient for Client { mode.change.clone() }; - send!( - tx, - BindModeUpdate { - name, - pango_markup: mode.pango_markup, - } - ); + tx.send_expect(BindModeUpdate { + name, + pango_markup: mode.pango_markup, + }); }) .await })?; diff --git a/src/clients/libinput.rs b/src/clients/libinput.rs index 3ecd6c4..dff0f80 100644 --- a/src/clients/libinput.rs +++ b/src/clients/libinput.rs @@ -1,4 +1,5 @@ -use crate::{Ironbar, arc_rw, read_lock, send, spawn, write_lock}; +use crate::channels::SyncSenderExt; +use crate::{Ironbar, arc_rw, read_lock, spawn, write_lock}; use color_eyre::{Report, Result}; use colpetto::event::{AsRawEvent, DeviceEvent, KeyState, KeyboardEvent}; use colpetto::{DeviceCapability, Libinput}; @@ -179,7 +180,7 @@ impl Client { device_path.display() ); write_lock!(self.known_devices).push(device_path.to_path_buf()); - send!(self.tx, Event::Device); + self.tx.send_expect(Event::Device); } } } @@ -208,7 +209,7 @@ impl Client { let data = KeyData { device_path, key }; if let Ok(event) = data.try_into() { - send!(tx, event); + tx.send_expect(event); } }); } diff --git a/src/clients/music/mpd.rs b/src/clients/music/mpd.rs index 9d1157d..790d6ba 100644 --- a/src/clients/music/mpd.rs +++ b/src/clients/music/mpd.rs @@ -1,7 +1,8 @@ use super::{ MusicClient, PlayerState, PlayerUpdate, ProgressTick, Status, TICK_INTERVAL_MS, Track, }; -use crate::{Ironbar, await_sync, send, spawn}; +use crate::channels::SyncSenderExt; +use crate::{Ironbar, await_sync, spawn}; use color_eyre::Report; use color_eyre::Result; use mpd_client::client::{ConnectionEvent, Subsystem}; @@ -97,7 +98,7 @@ impl Client { let status = Status::from(status); let update = PlayerUpdate::Update(Box::new(track), status); - send!(tx, update); + tx.send_expect(update); } Ok(()) @@ -113,7 +114,7 @@ impl Client { elapsed: status.elapsed, }); - send!(tx, update); + tx.send_expect(update); } } } diff --git a/src/clients/music/mpris.rs b/src/clients/music/mpris.rs index 6eee6e2..96af8cd 100644 --- a/src/clients/music/mpris.rs +++ b/src/clients/music/mpris.rs @@ -1,6 +1,7 @@ use super::{MusicClient, PlayerState, PlayerUpdate, Status, TICK_INTERVAL_MS, Track}; +use crate::channels::SyncSenderExt; use crate::clients::music::ProgressTick; -use crate::{arc_mut, lock, send, spawn_blocking}; +use crate::{arc_mut, lock, spawn_blocking}; use color_eyre::Result; use mpris::{DBusError, Event, Metadata, PlaybackStatus, Player, PlayerFinder}; use std::cmp; @@ -137,7 +138,7 @@ impl Client { let mut players_locked = lock!(players); players_locked.remove(identity); if players_locked.is_empty() { - send!(tx, PlayerUpdate::Update(Box::new(None), Status::default())); + tx.send_expect(PlayerUpdate::Update(Box::new(None), Status::default())); } }; @@ -212,7 +213,7 @@ impl Client { let track = Track::from(metadata); let player_update = PlayerUpdate::Update(Box::new(Some(track)), status); - send!(tx, player_update); + tx.send_expect(player_update); Ok(()) } @@ -242,7 +243,7 @@ impl Client { duration: metadata.length(), }); - send!(tx, update); + tx.send_expect(update); } } } @@ -327,7 +328,9 @@ impl MusicClient for Client { state: PlayerState::Stopped, volume_percent: None, }; - send!(self.tx, PlayerUpdate::Update(Box::new(None), status)); + + self.tx + .send_expect(PlayerUpdate::Update(Box::new(None), status)); } rx diff --git a/src/clients/swaync/mod.rs b/src/clients/swaync/mod.rs index 9ec71a5..e1ee1db 100644 --- a/src/clients/swaync/mod.rs +++ b/src/clients/swaync/mod.rs @@ -1,6 +1,7 @@ mod dbus; -use crate::{register_fallible_client, send, spawn}; +use crate::channels::SyncSenderExt; +use crate::{register_fallible_client, spawn}; use color_eyre::{Report, Result}; use dbus::SwayNcProxy; use serde::Deserialize; @@ -60,7 +61,7 @@ impl Client { .deserialize::() .expect("to deserialize"); debug!("Received event: {ev:?}"); - send!(tx, ev); + tx.send_expect(ev); } }); } diff --git a/src/clients/volume/mod.rs b/src/clients/volume/mod.rs index ede6536..5496ff3 100644 --- a/src/clients/volume/mod.rs +++ b/src/clients/volume/mod.rs @@ -1,7 +1,7 @@ mod sink; mod sink_input; -use crate::{APP_ID, arc_mut, lock, register_client, send, spawn_blocking}; +use crate::{APP_ID, arc_mut, lock, register_client, spawn_blocking}; use libpulse_binding::callbacks::ListResult; use libpulse_binding::context::introspect::{Introspector, ServerInfo}; use libpulse_binding::context::subscribe::{Facility, InterestMaskSet, Operation}; @@ -14,6 +14,7 @@ use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; use tracing::{debug, error, info, trace, warn}; +use crate::channels::SyncSenderExt; pub use sink::Sink; pub use sink_input::SinkInput; @@ -271,7 +272,7 @@ fn set_default_sink( { sink.active = true; debug!("Set sink active: {}", sink.name); - send!(tx, Event::UpdateSink(sink.clone())); + tx.send_expect(Event::UpdateSink(sink.clone())); } else { warn!("Couldn't find sink: {}", default_sink_name); } diff --git a/src/clients/volume/sink.rs b/src/clients/volume/sink.rs index b88a168..1269c9d 100644 --- a/src/clients/volume/sink.rs +++ b/src/clients/volume/sink.rs @@ -1,5 +1,6 @@ use super::{ArcMutVec, Client, ConnectionState, Event, percent_to_volume, volume_to_percent}; -use crate::{lock, send}; +use crate::channels::SyncSenderExt; +use crate::lock; use libpulse_binding::callbacks::ListResult; use libpulse_binding::context::Context; use libpulse_binding::context::introspect::SinkInfo; @@ -62,7 +63,7 @@ impl Client { let ListResult::Item(info) = info else { return; }; - send!(tx, info.volume); + tx.send_expect(info.volume); }); let new_volume = percent_to_volume(volume_percent); @@ -129,7 +130,7 @@ pub fn add(info: ListResult<&SinkInfo>, sinks: &ArcMutVec, tx: &broadcast: trace!("adding {info:?}"); lock!(sinks).push(info.into()); - send!(tx, Event::AddSink(info.into())); + tx.send_expect(Event::AddSink(info.into())); } fn update( @@ -170,7 +171,7 @@ fn update( } } - send!(tx, Event::UpdateSink(sink)); + tx.send_expect(Event::UpdateSink(sink)); } fn remove(index: u32, sinks: &ArcMutVec, tx: &broadcast::Sender) { @@ -180,6 +181,6 @@ fn remove(index: u32, sinks: &ArcMutVec, tx: &broadcast::Sender) { if let Some(pos) = sinks.iter().position(|s| s.index == index) { let info = sinks.remove(pos); - send!(tx, Event::RemoveSink(info.name)); + tx.send_expect(Event::RemoveSink(info.name)); } } diff --git a/src/clients/volume/sink_input.rs b/src/clients/volume/sink_input.rs index da77ccd..b6e11aa 100644 --- a/src/clients/volume/sink_input.rs +++ b/src/clients/volume/sink_input.rs @@ -1,5 +1,6 @@ use super::{ArcMutVec, Client, ConnectionState, Event, percent_to_volume, volume_to_percent}; -use crate::{lock, send}; +use crate::channels::SyncSenderExt; +use crate::lock; use libpulse_binding::callbacks::ListResult; use libpulse_binding::context::Context; use libpulse_binding::context::introspect::SinkInputInfo; @@ -49,7 +50,7 @@ impl Client { let ListResult::Item(info) = info else { return; }; - send!(tx, info.volume); + tx.send_expect(info.volume); }); let new_volume = percent_to_volume(volume_percent); @@ -118,7 +119,7 @@ pub fn add( trace!("adding {info:?}"); lock!(inputs).push(info.into()); - send!(tx, Event::AddInput(info.into())); + tx.send_expect(Event::AddInput(info.into())); } fn update( @@ -142,7 +143,7 @@ fn update( inputs[pos] = info.into(); } - send!(tx, Event::UpdateInput(info.into())); + tx.send_expect(Event::UpdateInput(info.into())); } fn remove(index: u32, inputs: &ArcMutVec, tx: &broadcast::Sender) { @@ -152,6 +153,6 @@ fn remove(index: u32, inputs: &ArcMutVec, tx: &broadcast::Sender send!(output_tx, event), + Event::Output(event) => output_tx.send_expect(event), #[cfg(any(feature = "focused", feature = "launcher"))] - Event::Toplevel(event) => send!(toplevel_tx, event), + Event::Toplevel(event) => toplevel_tx.send_expect(event), #[cfg(feature = "clipboard")] - Event::Clipboard(item) => send!(clipboard_tx, item), + Event::Clipboard(item) => clipboard_tx.send_expect(item), }; } }); @@ -176,7 +177,7 @@ impl Client { /// Sends a request to the environment event loop, /// and returns the response. fn send_request(&self, request: Request) -> Response { - send!(self.tx, request); + self.tx.send_expect(request); lock!(self.rx).recv().expect(ERR_CHANNEL_RECV) } @@ -333,12 +334,12 @@ impl Environment { match event { Msg(Request::Roundtrip) => { debug!("received roundtrip request"); - send!(env.response_tx, Response::Ok); + env.response_tx.send_expect(Response::Ok); } #[cfg(feature = "ipc")] Msg(Request::OutputInfoAll) => { let infos = env.output_info_all(); - send!(env.response_tx, Response::OutputInfoAll(infos)); + env.response_tx.send_expect(Response::OutputInfoAll(infos)); } #[cfg(any(feature = "focused", feature = "launcher"))] Msg(Request::ToplevelInfoAll) => { @@ -347,7 +348,9 @@ impl Environment { .iter() .filter_map(ToplevelHandle::info) .collect(); - send!(env.response_tx, Response::ToplevelInfoAll(infos)); + + env.response_tx + .send_expect(Response::ToplevelInfoAll(infos)); } #[cfg(feature = "launcher")] Msg(Request::ToplevelFocus(id)) => { @@ -361,7 +364,7 @@ impl Environment { handle.focus(&seat); } - send!(env.response_tx, Response::Ok); + env.response_tx.send_expect(Response::Ok); } #[cfg(feature = "launcher")] Msg(Request::ToplevelMinimize(id)) => { @@ -374,17 +377,17 @@ impl Environment { handle.minimize(); } - send!(env.response_tx, Response::Ok); + env.response_tx.send_expect(Response::Ok); } #[cfg(feature = "clipboard")] Msg(Request::CopyToClipboard(item)) => { env.copy_to_clipboard(item); - send!(env.response_tx, Response::Ok); + env.response_tx.send_expect(Response::Ok); } #[cfg(feature = "clipboard")] Msg(Request::ClipboardItem) => { let item = lock!(env.clipboard).clone(); - send!(env.response_tx, Response::ClipboardItem(item)); + env.response_tx.send_expect(Response::ClipboardItem(item)); } calloop_channel::Event::Closed => error!("request channel unexpectedly closed"), } diff --git a/src/clients/wayland/wl_output.rs b/src/clients/wayland/wl_output.rs index da3b9b1..602978a 100644 --- a/src/clients/wayland/wl_output.rs +++ b/src/clients/wayland/wl_output.rs @@ -1,5 +1,5 @@ use super::{Client, Environment, Event}; -use crate::try_send; +use crate::channels::AsyncSenderExt; use smithay_client_toolkit::output::{OutputHandler, OutputInfo, OutputState}; use tokio::sync::broadcast; use tracing::{debug, error}; @@ -63,13 +63,10 @@ impl OutputHandler for Environment { fn new_output(&mut self, _conn: &Connection, _qh: &QueueHandle, output: WlOutput) { debug!("Handler received new output"); if let Some(info) = self.output_state.info(&output) { - try_send!( - self.event_tx, - Event::Output(OutputEvent { - output: info, - event_type: OutputEventType::New - }) - ); + self.event_tx.send_spawn(Event::Output(OutputEvent { + output: info, + event_type: OutputEventType::New, + })); } else { error!("Output is missing information!"); } @@ -78,13 +75,10 @@ impl OutputHandler for Environment { fn update_output(&mut self, _conn: &Connection, _qh: &QueueHandle, output: WlOutput) { debug!("Handle received output update"); if let Some(info) = self.output_state.info(&output) { - try_send!( - self.event_tx, - Event::Output(OutputEvent { - output: info, - event_type: OutputEventType::Update - }) - ); + self.event_tx.send_spawn(Event::Output(OutputEvent { + output: info, + event_type: OutputEventType::Update, + })); } else { error!("Output is missing information!"); } @@ -93,13 +87,10 @@ impl OutputHandler for Environment { fn output_destroyed(&mut self, _conn: &Connection, _qh: &QueueHandle, output: WlOutput) { debug!("Handle received output destruction"); if let Some(info) = self.output_state.info(&output) { - try_send!( - self.event_tx, - Event::Output(OutputEvent { - output: info, - event_type: OutputEventType::Destroyed - }) - ); + self.event_tx.send_spawn(Event::Output(OutputEvent { + output: info, + event_type: OutputEventType::Destroyed, + })); } else { error!("Output is missing information!"); } diff --git a/src/clients/wayland/wlr_data_control/mod.rs b/src/clients/wayland/wlr_data_control/mod.rs index 8d239af..b2245a4 100644 --- a/src/clients/wayland/wlr_data_control/mod.rs +++ b/src/clients/wayland/wlr_data_control/mod.rs @@ -7,7 +7,8 @@ use self::device::{DataControlDeviceDataExt, DataControlDeviceHandler}; use self::offer::{DataControlDeviceOffer, DataControlOfferHandler}; use self::source::DataControlSourceHandler; use super::{Client, Environment, Event, Request, Response}; -use crate::{Ironbar, lock, spawn, try_send}; +use crate::channels::AsyncSenderExt; +use crate::{Ironbar, lock, spawn}; use color_eyre::Result; use device::DataControlDevice; use glib::Bytes; @@ -219,14 +220,12 @@ impl DataControlDeviceHandler for Environment { let Some(mime_type) = MimeType::parse_multiple(&mime_types) else { lock!(self.clipboard).take(); // send an event so the clipboard module is aware it's changed - try_send!( - self.event_tx, - Event::Clipboard(ClipboardItem { - id: usize::MAX, - mime_type: String::new().into(), - value: Arc::new(ClipboardValue::Other) - }) - ); + self.event_tx.send_spawn(Event::Clipboard(ClipboardItem { + id: usize::MAX, + mime_type: String::new().into(), + value: Arc::new(ClipboardValue::Other), + })); + return; }; @@ -239,7 +238,7 @@ impl DataControlDeviceHandler for Environment { match Self::read_file(&mime_type, &mut read_pipe).await { Ok(item) => { lock!(clipboard).replace(item.clone()); - try_send!(tx, Event::Clipboard(item)); + tx.send_spawn(Event::Clipboard(item)); } Err(err) => error!("{err:?}"), } diff --git a/src/clients/wayland/wlr_foreign_toplevel/mod.rs b/src/clients/wayland/wlr_foreign_toplevel/mod.rs index c9baf0f..cc0eb12 100644 --- a/src/clients/wayland/wlr_foreign_toplevel/mod.rs +++ b/src/clients/wayland/wlr_foreign_toplevel/mod.rs @@ -4,11 +4,11 @@ pub mod manager; use self::handle::ToplevelHandleHandler; use self::manager::ToplevelManagerHandler; use super::{Client, Environment, Event, Request, Response}; -use crate::try_send; use tokio::sync::broadcast; use tracing::{debug, error, trace}; use wayland_client::{Connection, QueueHandle}; +use crate::channels::AsyncSenderExt; pub use handle::{ToplevelHandle, ToplevelInfo}; #[derive(Debug, Clone)] @@ -71,7 +71,8 @@ impl ToplevelHandleHandler for Environment { trace!("Adding new handle: {info:?}"); self.handles.push(handle.clone()); if let Some(info) = handle.info() { - try_send!(self.event_tx, Event::Toplevel(ToplevelEvent::New(info))); + self.event_tx + .send_spawn(Event::Toplevel(ToplevelEvent::New(info))); } } None => { @@ -92,7 +93,8 @@ impl ToplevelHandleHandler for Environment { Some(info) => { trace!("Updating handle: {info:?}"); if let Some(info) = handle.info() { - try_send!(self.event_tx, Event::Toplevel(ToplevelEvent::Update(info))); + self.event_tx + .send_spawn(Event::Toplevel(ToplevelEvent::Update(info))); } } None => { @@ -111,7 +113,8 @@ impl ToplevelHandleHandler for Environment { self.handles.retain(|h| h != &handle); if let Some(info) = handle.info() { - try_send!(self.event_tx, Event::Toplevel(ToplevelEvent::Remove(info))); + self.event_tx + .send_spawn(Event::Toplevel(ToplevelEvent::Remove(info))); } } } diff --git a/src/dynamic_value/dynamic_bool.rs b/src/dynamic_value/dynamic_bool.rs index 5833fbe..cff21a6 100644 --- a/src/dynamic_value/dynamic_bool.rs +++ b/src/dynamic_value/dynamic_bool.rs @@ -1,7 +1,8 @@ -use crate::script::Script; #[cfg(feature = "ipc")] -use crate::{Ironbar, send_async}; -use crate::{glib_recv_mpsc, spawn, try_send}; +use crate::Ironbar; +use crate::channels::{AsyncSenderExt, MpscReceiverExt}; +use crate::script::Script; +use crate::spawn; use cfg_if::cfg_if; use serde::Deserialize; use tokio::sync::mpsc; @@ -18,7 +19,7 @@ pub enum DynamicBool { } impl DynamicBool { - pub fn subscribe(self, mut f: F) + pub fn subscribe(self, f: F) where F: FnMut(bool) + 'static, { @@ -42,14 +43,14 @@ impl DynamicBool { let (tx, rx) = mpsc::channel(32); - glib_recv_mpsc!(rx, val => f(val)); + rx.recv_glib(f); spawn(async move { match value { DynamicBool::Script(script) => { script .run(None, |_, success| { - try_send!(tx, success); + tx.send_spawn(success); }) .await; } @@ -62,7 +63,7 @@ impl DynamicBool { while let Ok(value) = rx.recv().await { let has_value = value.is_some_and(|s| is_truthy(&s)); - send_async!(tx, has_value); + tx.send_expect(has_value).await; } } DynamicBool::Unknown(_) => unreachable!(), diff --git a/src/dynamic_value/dynamic_string.rs b/src/dynamic_value/dynamic_string.rs index 057da5e..6611f66 100644 --- a/src/dynamic_value/dynamic_string.rs +++ b/src/dynamic_value/dynamic_string.rs @@ -1,7 +1,8 @@ #[cfg(feature = "ipc")] use crate::Ironbar; +use crate::channels::{AsyncSenderExt, MpscReceiverExt}; use crate::script::{OutputStream, Script}; -use crate::{arc_mut, glib_recv_mpsc, lock, spawn, try_send}; +use crate::{arc_mut, lock, spawn}; use tokio::sync::mpsc; /// A segment of a dynamic string, @@ -25,7 +26,7 @@ enum DynamicStringSegment { /// label.set_label_escaped(&string); /// }); /// ``` -pub fn dynamic_string(input: &str, mut f: F) +pub fn dynamic_string(input: &str, f: F) where F: FnMut(String) + 'static, { @@ -55,7 +56,7 @@ where let _: String = std::mem::replace(&mut label_parts[i], out); let string = label_parts.join(""); - try_send!(tx, string); + tx.send_spawn(string); } }) .await; @@ -80,7 +81,7 @@ where let _: String = std::mem::replace(&mut label_parts[i], value); let string = label_parts.join(""); - try_send!(tx, string); + tx.send_spawn(string); } } }); @@ -88,12 +89,12 @@ where } } - glib_recv_mpsc!(rx , val => f(val)); + rx.recv_glib(f); // initialize if is_static { let label_parts = lock!(label_parts).join(""); - try_send!(tx, label_parts); + tx.send_spawn(label_parts); } } diff --git a/src/image/provider.rs b/src/image/provider.rs index 5004c37..7b46e35 100644 --- a/src/image/provider.rs +++ b/src/image/provider.rs @@ -1,6 +1,7 @@ +use crate::channels::{AsyncSenderExt, MpscReceiverExt}; use crate::desktop_file::get_desktop_icon_name; #[cfg(feature = "http")] -use crate::{glib_recv_mpsc, send_async, spawn}; +use crate::spawn; use cfg_if::cfg_if; use color_eyre::{Help, Report, Result}; use gtk::cairo::Surface; @@ -151,14 +152,14 @@ impl<'a> ImageProvider<'a> { spawn(async move { let bytes = Self::get_bytes_from_http(url).await; if let Ok(bytes) = bytes { - send_async!(tx, bytes); + tx.send_expect(bytes).await; } }); { let size = self.size; let image = image.clone(); - glib_recv_mpsc!(rx, bytes => { + rx.recv_glib(move |bytes| { let stream = MemoryInputStream::from_bytes(&bytes); let scale = image.scale_factor(); @@ -173,8 +174,7 @@ impl<'a> ImageProvider<'a> { ); // Different error types makes this a bit awkward - match pixbuf.map(|pixbuf| Self::create_and_load_surface(&pixbuf, &image)) - { + match pixbuf.map(|pixbuf| Self::create_and_load_surface(&pixbuf, &image)) { Ok(Err(err)) => error!("{err:?}"), Err(err) => error!("{err:?}"), _ => {} diff --git a/src/ipc/server/mod.rs b/src/ipc/server/mod.rs index e6592e2..bdb7ef1 100644 --- a/src/ipc/server/mod.rs +++ b/src/ipc/server/mod.rs @@ -13,11 +13,11 @@ use tokio::net::{UnixListener, UnixStream}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tracing::{debug, error, info, warn}; +use super::Ipc; +use crate::channels::{AsyncSenderExt, MpscReceiverExt}; use crate::ipc::{Command, Response}; use crate::style::load_css; -use crate::{Ironbar, glib_recv_mpsc, send_async, spawn, try_send}; - -use super::Ipc; +use crate::{Ironbar, spawn}; impl Ipc { /// Starts the IPC server on its socket. @@ -66,9 +66,9 @@ impl Ipc { }); let application = application.clone(); - glib_recv_mpsc!(cmd_rx, command => { + cmd_rx.recv_glib(move |command| { let res = Self::handle_command(command, &application, &ironbar); - try_send!(res_tx, res); + res_tx.send_spawn(res); }); } @@ -91,7 +91,7 @@ impl Ipc { debug!("Received command: {command:?}"); - send_async!(cmd_tx, command); + cmd_tx.send_expect(command).await; let res = res_rx .recv() .await diff --git a/src/ironvar.rs b/src/ironvar.rs index 067ffef..cb57023 100644 --- a/src/ironvar.rs +++ b/src/ironvar.rs @@ -1,6 +1,7 @@ #![doc = include_str!("../docs/Ironvars.md")] -use crate::{arc_rw, read_lock, send, write_lock}; +use crate::channels::SyncSenderExt; +use crate::{arc_rw, read_lock, write_lock}; use color_eyre::{Report, Result}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -74,14 +75,10 @@ impl VariableManager { impl Namespace for VariableManager { fn get(&self, key: &str) -> Option { if key.contains('.') { - let Some((ns, key)) = key.split_once('.') else { - return None; - }; + let (ns, key) = key.split_once('.')?; let namespaces = read_lock!(self.namespaces); - let Some(ns) = namespaces.get(ns) else { - return None; - }; + let ns = namespaces.get(ns)?; ns.get(key).map(|v| v.to_owned()) } else { @@ -161,14 +158,14 @@ impl IronVar { /// The change is broadcast to all receivers. fn set(&mut self, value: Option) { self.value.clone_from(&value); - send!(self.tx, value); + self.tx.send_expect(value); } /// Subscribes to the variable. /// The latest value is immediately sent to all receivers. fn subscribe(&self) -> broadcast::Receiver> { let rx = self.tx.subscribe(); - send!(self.tx, self.value.clone()); + self.tx.send_expect(self.value.clone()); rx } } diff --git a/src/macros.rs b/src/macros.rs index 2fe924f..1ae00a3 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -35,6 +35,7 @@ macro_rules! module_impl { /// send_async!(tx, "my message"); /// ``` #[macro_export] +#[deprecated(since = "0.17.0", note = "Use `AsyncSenderExt::send_expect` instead")] macro_rules! send_async { ($tx:expr, $msg:expr) => { $tx.send($msg).await.expect($crate::error::ERR_CHANNEL_SEND) @@ -50,6 +51,7 @@ macro_rules! send_async { /// send!(tx, "my message"); /// ``` #[macro_export] +#[deprecated(since = "0.17.0", note = "Use `SyncSenderExt::send_expect` instead")] macro_rules! send { ($tx:expr, $msg:expr) => { $tx.send($msg).expect($crate::error::ERR_CHANNEL_SEND) @@ -65,6 +67,7 @@ macro_rules! send { /// try_send!(tx, "my message"); /// ``` #[macro_export] +#[deprecated(since = "0.17.0", note = "Use `AsyncSenderExt::send_spawn` instead")] macro_rules! try_send { ($tx:expr, $msg:expr) => { $tx.try_send($msg).expect($crate::error::ERR_CHANNEL_SEND) @@ -85,6 +88,7 @@ macro_rules! try_send { /// module_update!(tx, "my event"); /// ``` #[macro_export] +#[deprecated(since = "0.17.0", note = "Use `AsyncSenderExt::send_update` instead")] macro_rules! module_update { ($tx:expr, $msg:expr) => { send_async!($tx, $crate::modules::ModuleUpdateEvent::Update($msg)) @@ -105,6 +109,10 @@ macro_rules! module_update { /// glib_recv(rx, msg => println!("{msg}")); /// ``` #[macro_export] +#[deprecated( + since = "0.17.0", + note = "Use `BroadcastReceiverExt::recv_glib` instead" +)] macro_rules! glib_recv { ($rx:expr, $func:ident) => { glib_recv!($rx, ev => $func(ev)) }; @@ -143,6 +151,7 @@ macro_rules! glib_recv { /// glib_recv_mpsc(rx, msg => println!("{msg}")); /// ``` #[macro_export] +#[deprecated(since = "0.17.0", note = "Use `MpscReceiverExt::recv_glib` instead")] macro_rules! glib_recv_mpsc { ($rx:expr, $func:ident) => { glib_recv_mpsc!($rx, ev => $func(ev)) }; diff --git a/src/main.rs b/src/main.rs index abed965..3c283eb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,6 +25,7 @@ use tracing::{debug, error, info, warn}; use universal_config::ConfigLoader; use crate::bar::{Bar, create_bar}; +use crate::channels::SyncSenderExt; use crate::clients::Clients; use crate::clients::wayland::OutputEventType; use crate::config::{Config, MonitorConfig}; @@ -34,6 +35,7 @@ use crate::ironvar::{VariableManager, WritableNamespace}; use crate::style::load_css; mod bar; +mod channels; #[cfg(feature = "cli")] mod cli; mod clients; @@ -202,7 +204,7 @@ impl Ironbar { .expect("Error setting Ctrl-C handler"); let hold = app.hold(); - send!(activate_tx, hold); + activate_tx.send_expect(hold); }); { diff --git a/src/modules/bindmode.rs b/src/modules/bindmode.rs index e1bc617..e875e3e 100644 --- a/src/modules/bindmode.rs +++ b/src/modules/bindmode.rs @@ -1,8 +1,9 @@ +use crate::channels::{AsyncSenderExt, BroadcastReceiverExt}; use crate::clients::compositor::BindModeUpdate; use crate::config::{CommonConfig, LayoutConfig, TruncateMode}; use crate::gtk_helpers::IronbarLabelExt; use crate::modules::{Module, ModuleInfo, ModuleParts, WidgetContext}; -use crate::{glib_recv, module_impl, module_update, send_async, spawn}; +use crate::{module_impl, spawn}; use color_eyre::Result; use gtk::Label; use gtk::prelude::*; @@ -49,7 +50,7 @@ impl Module