Remove generic Hasher and Event from subscription::Recipe

This commit is contained in:
Héctor Ramón Jiménez 2023-03-05 04:15:10 +01:00
parent 5fed065dc3
commit f4cf488e0b
No known key found for this signature in database
GPG key ID: 140CC052C94F138E
20 changed files with 341 additions and 406 deletions

View file

@ -1,38 +1,35 @@
use crate::{BoxFuture, MaybeSend, Subscription};
use crate::core::event::{self, Event};
use crate::core::Hasher;
use crate::subscription::Recipe;
use crate::{BoxFuture, MaybeSend};
use futures::{
channel::mpsc,
sink::{Sink, SinkExt},
};
use std::{collections::HashMap, marker::PhantomData};
use futures::channel::mpsc;
use futures::sink::{Sink, SinkExt};
use std::collections::HashMap;
use std::hash::Hasher as _;
/// A registry of subscription streams.
///
/// If you have an application that continuously returns a [`Subscription`],
/// you can use a [`Tracker`] to keep track of the different recipes and keep
/// its executions alive.
#[derive(Debug)]
pub struct Tracker<Hasher, Event> {
subscriptions: HashMap<u64, Execution<Event>>,
_hasher: PhantomData<Hasher>,
#[derive(Debug, Default)]
pub struct Tracker {
subscriptions: HashMap<u64, Execution>,
}
#[derive(Debug)]
pub struct Execution<Event> {
pub struct Execution {
_cancel: futures::channel::oneshot::Sender<()>,
listener: Option<futures::channel::mpsc::Sender<Event>>,
listener: Option<futures::channel::mpsc::Sender<(Event, event::Status)>>,
}
impl<Hasher, Event> Tracker<Hasher, Event>
where
Hasher: std::hash::Hasher + Default,
Event: 'static + Send + Clone,
{
impl Tracker {
/// Creates a new empty [`Tracker`].
pub fn new() -> Self {
Self {
subscriptions: HashMap::new(),
_hasher: PhantomData,
}
}
@ -56,7 +53,7 @@ where
/// [`Recipe`]: crate::subscription::Recipe
pub fn update<Message, Receiver>(
&mut self,
subscription: Subscription<Hasher, Event, Message>,
recipes: impl Iterator<Item = Box<dyn Recipe<Output = Message>>>,
receiver: Receiver,
) -> Vec<BoxFuture<()>>
where
@ -70,8 +67,6 @@ where
use futures::stream::StreamExt;
let mut futures: Vec<BoxFuture<()>> = Vec::new();
let recipes = subscription.recipes();
let mut alive = std::collections::HashSet::new();
for recipe in recipes {
@ -142,12 +137,12 @@ where
/// currently open.
///
/// [`Recipe::stream`]: crate::subscription::Recipe::stream
pub fn broadcast(&mut self, event: Event) {
pub fn broadcast(&mut self, event: Event, status: event::Status) {
self.subscriptions
.values_mut()
.filter_map(|connection| connection.listener.as_mut())
.for_each(|listener| {
if let Err(error) = listener.try_send(event.clone()) {
if let Err(error) = listener.try_send((event.clone(), status)) {
log::warn!(
"Error sending event to subscription: {:?}",
error
@ -156,13 +151,3 @@ where
});
}
}
impl<Hasher, Event> Default for Tracker<Hasher, Event>
where
Hasher: std::hash::Hasher + Default,
Event: 'static + Send + Clone,
{
fn default() -> Self {
Self::new()
}
}