use crate::modules::ModuleUpdateEvent; use crate::spawn; use smithay_client_toolkit::reexports::calloop; use std::fmt::Debug; use tokio::sync::{broadcast, mpsc}; pub trait SyncSenderExt { /// Asynchronously sends a message on the channel, /// panicking if it cannot be sent. /// /// This should be used in cases where sending should *never* fail, /// or where failing indicates a serious bug. fn send_expect(&self, message: T); } impl SyncSenderExt for std::sync::mpsc::Sender { #[inline] fn send_expect(&self, message: T) { self.send(message).expect(crate::error::ERR_CHANNEL_SEND); } } impl SyncSenderExt for calloop::channel::Sender { #[inline] fn send_expect(&self, message: T) { self.send(message).expect(crate::error::ERR_CHANNEL_SEND); } } impl SyncSenderExt for broadcast::Sender { #[inline] fn send_expect(&self, message: T) { self.send(message).expect(crate::error::ERR_CHANNEL_SEND); } } pub trait AsyncSenderExt: Sync + Send + Sized + Clone { /// Asynchronously sends a message on the channel, /// panicking if it cannot be sent. /// /// This should be used in cases where sending should *never* fail, /// or where failing indicates a serious bug. fn send_expect(&self, message: T) -> impl Future + Send; /// Asynchronously sends a message on the channel, /// spawning a task to allow it to be sent in the background, /// and panicking if it cannot be sent. /// /// Note that this function will return *before* the message is sent. /// /// This should be used in cases where sending should *never* fail, /// or where failing indicates a serious bug. #[inline] fn send_spawn(&self, message: T) where Self: 'static, T: Send + 'static, { let tx = self.clone(); spawn(async move { tx.send_expect(message).await }); } /// Shorthand for [`AsyncSenderExt::send_expect`] /// when sending a [`ModuleUpdateEvent::Update`]. #[inline] async fn send_update(&self, update: U) where Self: AsyncSenderExt>, { self.send_expect(ModuleUpdateEvent::Update(update)).await; } /// Shorthand for [`AsyncSenderExt::send_spawn`] /// when sending a [`ModuleUpdateEvent::Update`]. #[inline] fn send_update_spawn(&self, update: U) where Self: AsyncSenderExt> + 'static, U: Clone + Send + 'static, { self.send_spawn(ModuleUpdateEvent::Update(update)); } } impl AsyncSenderExt for mpsc::Sender { #[inline] async fn send_expect(&self, message: T) { self.send(message) .await .expect(crate::error::ERR_CHANNEL_SEND); } } pub trait MpscReceiverExt { /// Spawns a `GLib` future on the local thread, and calls `rx.recv()` /// in a loop, passing the message to `f`. /// /// This allows use of `GObjects` and futures in the same context. fn recv_glib(self, f: F) where F: FnMut(T) + 'static; } impl MpscReceiverExt for mpsc::Receiver { fn recv_glib(mut self, mut f: F) where F: FnMut(T) + 'static, { glib::spawn_future_local(async move { while let Some(val) = self.recv().await { f(val); } }); } } pub trait BroadcastReceiverExt where T: Debug + Clone + 'static, { /// Spawns a `GLib` future on the local thread, and calls `rx.recv()` /// in a loop, passing the message to `f`. /// /// This allows use of `GObjects` and futures in the same context. fn recv_glib(self, f: F) where F: FnMut(T) + 'static; /// Like [`BroadcastReceiverExt::recv_glib`], but the closure must return a [`Future`]. fn recv_glib_async(self, f: Fn) where Fn: FnMut(T) -> F + 'static, F: Future; } impl BroadcastReceiverExt for broadcast::Receiver where T: Debug + Clone + 'static, { fn recv_glib(mut self, mut f: F) where F: FnMut(T) + 'static, { glib::spawn_future_local(async move { loop { match self.recv().await { Ok(val) => f(val), Err(broadcast::error::RecvError::Lagged(count)) => { tracing::warn!( "Channel lagged behind by {count}, this may result in unexpected or broken behaviour" ); } Err(err) => { tracing::error!("{err:?}"); break; } } } }); } fn recv_glib_async(mut self, mut f: Fn) where Fn: FnMut(T) -> F + 'static, F: Future, { glib::spawn_future_local(async move { loop { match self.recv().await { Ok(val) => { f(val).await; } Err(broadcast::error::RecvError::Lagged(count)) => { tracing::warn!( "Channel lagged behind by {count}, this may result in unexpected or broken behaviour" ); } Err(err) => { tracing::error!("{err:?}"); break; } } } }); } }