diff --git a/rs/hang-cli/src/client.rs b/rs/hang-cli/src/client.rs index fa14f569e..b815a1e46 100644 --- a/rs/hang-cli/src/client.rs +++ b/rs/hang-cli/src/client.rs @@ -6,12 +6,12 @@ use url::Url; pub async fn run_client(client: moq_native::Client, url: Url, name: String, publish: Publish) -> anyhow::Result<()> { // Create an origin producer to publish to the broadcast. let origin = moq_lite::Origin::produce(); - origin.producer.publish_broadcast(&name, publish.consume()); + origin.publish_broadcast(&name, publish.consume()); tracing::info!(%url, %name, "connecting"); // Establish the connection, not providing a subscriber. - let session = client.with_publish(origin.consumer).connect(url).await?; + let session = client.with_publish(origin.consume()).connect(url).await?; #[cfg(unix)] // Notify systemd that we're ready. diff --git a/rs/hang-cli/src/server.rs b/rs/hang-cli/src/server.rs index 02ce9def7..032459513 100644 --- a/rs/hang-cli/src/server.rs +++ b/rs/hang-cli/src/server.rs @@ -40,10 +40,10 @@ async fn run_session( ) -> anyhow::Result<()> { // Create an origin producer to publish to the broadcast. let origin = moq_lite::Origin::produce(); - origin.producer.publish_broadcast(&name, consumer); + origin.publish_broadcast(&name, consumer); // Blindly accept the session (WebTransport or QUIC), regardless of the URL. - let session = session.with_publish(origin.consumer).accept().await?; + let session = session.with_publish(origin.consume()).accept().await?; tracing::info!(id, "accepted session"); diff --git a/rs/hang/examples/video.rs b/rs/hang/examples/video.rs index 1b169d605..d566407a4 100644 --- a/rs/hang/examples/video.rs +++ b/rs/hang/examples/video.rs @@ -8,13 +8,14 @@ async fn main() -> anyhow::Result<()> { // Create an origin that we can publish to and the session can consume from. let origin = moq_lite::Origin::produce(); + let consumer = origin.consume(); // Run the broadcast production and the session in parallel. // This is a simple example of how you can concurrently run multiple tasks. // tokio::spawn works too. tokio::select! { - res = run_broadcast(origin.producer) => res, - res = run_session(origin.consumer) => res, + res = run_broadcast(origin) => res, + res = run_session(consumer) => res, } } @@ -93,7 +94,7 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProdu .produce(); // Publish the catalog track to the broadcast. - broadcast.insert_track(catalog.consumer.track); + broadcast.insert_track(catalog.track.consume()); // Actually create the media track now. let track = broadcast.create_track(video_track); @@ -106,11 +107,11 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProdu async fn run_broadcast(origin: moq_lite::OriginProducer) -> anyhow::Result<()> { // Create and publish a broadcast to the origin. let mut broadcast = moq_lite::Broadcast::produce(); - let mut track = create_track(&mut broadcast.producer); + let mut track = create_track(&mut broadcast); // NOTE: The path is empty because we're using the URL to scope the broadcast. // OPTIONAL: We publish after inserting the tracks just to avoid a nearly impossible race condition. - origin.publish_broadcast("", broadcast.consumer); + origin.publish_broadcast("", broadcast.consume()); // Not real frames of course. track.write(hang::Frame { diff --git a/rs/hang/src/catalog/root.rs b/rs/hang/src/catalog/root.rs index 28eb3c0d1..5c97e4aa7 100644 --- a/rs/hang/src/catalog/root.rs +++ b/rs/hang/src/catalog/root.rs @@ -7,7 +7,6 @@ use serde::{Deserialize, Serialize}; use crate::Result; use crate::catalog::{Audio, AudioConfig, Chat, User, Video, VideoConfig}; -use moq_lite::Produce; /// A catalog track, created by a broadcaster to describe the tracks available in a broadcast. #[serde_with::serde_as] @@ -83,13 +82,9 @@ impl Catalog { } /// Produce a catalog track that describes the available media tracks. - pub fn produce(self) -> Produce { + pub fn produce(self) -> CatalogProducer { let track = Catalog::default_track().produce(); - - Produce { - producer: CatalogProducer::new(track.producer, self), - consumer: track.consumer.into(), - } + CatalogProducer::new(track, self) } pub fn default_track() -> moq_lite::Track { diff --git a/rs/hang/src/import/aac.rs b/rs/hang/src/import/aac.rs index 745b4c9bd..8d012966d 100644 --- a/rs/hang/src/import/aac.rs +++ b/rs/hang/src/import/aac.rs @@ -114,13 +114,13 @@ impl Aac { tracing::debug!(name = ?track.name, ?config, "starting track"); let track = track.produce(); - self.broadcast.insert_track(track.consumer); + self.broadcast.insert_track(track.consume()); let mut catalog = self.broadcast.catalog.lock(); - let audio = catalog.insert_audio(track.producer.info.name.clone(), config); + let audio = catalog.insert_audio(track.info.name.clone(), config); audio.priority = 2; - self.track = Some(track.producer); + self.track = Some(track); Ok(()) } diff --git a/rs/hang/src/import/avc3.rs b/rs/hang/src/import/avc3.rs index af9171c13..5b0d19ef1 100644 --- a/rs/hang/src/import/avc3.rs +++ b/rs/hang/src/import/avc3.rs @@ -92,10 +92,10 @@ impl Avc3 { } let track = track.produce(); - self.broadcast.insert_track(track.consumer); + self.broadcast.insert_track(track.consume()); self.config = Some(config); - self.track = Some(track.producer.into()); + self.track = Some(track.into()); Ok(()) } diff --git a/rs/hang/src/import/hev1.rs b/rs/hang/src/import/hev1.rs index 4d203889b..6a0471a44 100644 --- a/rs/hang/src/import/hev1.rs +++ b/rs/hang/src/import/hev1.rs @@ -91,10 +91,10 @@ impl Hev1 { } let track = track.produce(); - self.broadcast.insert_track(track.consumer); + self.broadcast.insert_track(track.consume()); self.config = Some(config); - self.track = Some(track.producer.into()); + self.track = Some(track.into()); Ok(()) } diff --git a/rs/hang/src/import/hls.rs b/rs/hang/src/import/hls.rs index 284bdf39a..1779ed376 100644 --- a/rs/hang/src/import/hls.rs +++ b/rs/hang/src/import/hls.rs @@ -626,7 +626,7 @@ mod tests { #[test] fn hls_ingest_starts_without_importers() { - let broadcast = moq_lite::Broadcast::produce().producer.into(); + let broadcast = moq_lite::Broadcast::produce().into(); let url = "https://example.com/master.m3u8".to_string(); let cfg = HlsConfig::new(url); let hls = Hls::new(broadcast, cfg).unwrap(); diff --git a/rs/hang/src/import/opus.rs b/rs/hang/src/import/opus.rs index ee0ea90d0..d01ba0a11 100644 --- a/rs/hang/src/import/opus.rs +++ b/rs/hang/src/import/opus.rs @@ -60,13 +60,13 @@ impl Opus { tracing::debug!(name = ?track.name, ?config, "starting track"); let track = track.produce(); - self.broadcast.insert_track(track.consumer); + self.broadcast.insert_track(track.consume()); let mut catalog = self.broadcast.catalog.lock(); - let audio = catalog.insert_audio(track.producer.info.name.clone(), config); + let audio = catalog.insert_audio(track.info.name.clone(), config); audio.priority = 2; - self.track = Some(track.producer); + self.track = Some(track); Ok(()) } diff --git a/rs/hang/src/model/broadcast.rs b/rs/hang/src/model/broadcast.rs index ca6e671c7..c2407587d 100644 --- a/rs/hang/src/model/broadcast.rs +++ b/rs/hang/src/model/broadcast.rs @@ -19,11 +19,11 @@ pub struct BroadcastProducer { impl BroadcastProducer { pub fn new(mut inner: moq_lite::BroadcastProducer) -> Self { let catalog = Catalog::default().produce(); - inner.insert_track(catalog.consumer.track); + inner.insert_track(catalog.track.consume()); Self { inner, - catalog: catalog.producer, + catalog, track_id: Default::default(), } } diff --git a/rs/moq-clock/src/main.rs b/rs/moq-clock/src/main.rs index a1c14eb61..fb8583045 100644 --- a/rs/moq-clock/src/main.rs +++ b/rs/moq-clock/src/main.rs @@ -63,12 +63,12 @@ async fn main() -> anyhow::Result<()> { match config.role { Command::Publish => { let mut broadcast = moq_lite::Broadcast::produce(); - let track = broadcast.producer.create_track(track); + let track = broadcast.create_track(track); let clock = clock::Publisher::new(track); - origin.producer.publish_broadcast(&config.broadcast, broadcast.consumer); + origin.publish_broadcast(&config.broadcast, broadcast.consume()); - let session = client.with_publish(origin.consumer).connect(config.url).await?; + let session = client.with_publish(origin.consume()).connect(config.url).await?; tokio::select! { res = session.closed() => res.context("session closed"), @@ -76,7 +76,7 @@ async fn main() -> anyhow::Result<()> { } } Command::Subscribe => { - let session = client.with_consume(origin.producer).connect(config.url).await?; + let session = client.with_consume(origin.clone()).connect(config.url).await?; // NOTE: We could just call `session.consume_broadcast(&config.broadcast)` instead, // However that won't work with IETF MoQ and the current OriginConsumer API the moment. @@ -86,7 +86,7 @@ async fn main() -> anyhow::Result<()> { let path: moq_lite::Path<'_> = config.broadcast.into(); let mut origin = origin - .consumer + .consume() .consume_only(&[path]) .context("not allowed to consume broadcast")?; diff --git a/rs/moq-lite/src/ietf/publisher.rs b/rs/moq-lite/src/ietf/publisher.rs index 9c0224cc9..e3e08c79e 100644 --- a/rs/moq-lite/src/ietf/publisher.rs +++ b/rs/moq-lite/src/ietf/publisher.rs @@ -26,7 +26,7 @@ pub(super) struct Publisher { impl Publisher { pub fn new(session: S, origin: Option, control: Control, version: Version) -> Self { // Default to a dummy origin that is immediately closed. - let origin = origin.unwrap_or_else(|| Origin::produce().consumer); + let origin = origin.unwrap_or_else(|| Origin::produce().consume()); Self { session, origin, diff --git a/rs/moq-lite/src/ietf/subscriber.rs b/rs/moq-lite/src/ietf/subscriber.rs index b8ace558d..3d729a647 100644 --- a/rs/moq-lite/src/ietf/subscriber.rs +++ b/rs/moq-lite/src/ietf/subscriber.rs @@ -88,12 +88,12 @@ impl Subscriber { } Entry::Vacant(entry) => { let broadcast = Broadcast::produce(); - origin.publish_broadcast(path.clone(), broadcast.consumer); + origin.publish_broadcast(path.clone(), broadcast.consume()); entry.insert(BroadcastState { - producer: broadcast.producer.clone(), + producer: broadcast.clone(), count: 1, }); - broadcast.producer + broadcast } }; @@ -451,7 +451,7 @@ impl Subscriber { match state.subscribes.entry(request_id) { Entry::Vacant(entry) => { entry.insert(TrackState { - producer: track.producer, + producer: track.clone(), alias: Some(msg.track_alias), }); } @@ -466,7 +466,7 @@ impl Subscriber { // NOTE: This is debated in the IETF draft, but is significantly easier to implement. let mut broadcast = self.start_announce(msg.track_namespace.to_owned())?; - let exists = broadcast.insert_track(track.consumer); + let exists = broadcast.insert_track(track.consume()); if exists { tracing::warn!(track = %msg.track_name, "track already exists, replacing it"); } diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index d32b3aca8..c90ef99f5 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -22,7 +22,7 @@ pub(super) struct Publisher { impl Publisher { pub fn new(session: S, origin: Option, version: Version) -> Self { // Default to a dummy origin that is immediately closed. - let origin = origin.unwrap_or_else(|| Origin::produce().consumer); + let origin = origin.unwrap_or_else(|| Origin::produce().consume()); Self { session, origin, diff --git a/rs/moq-lite/src/lite/subscriber.rs b/rs/moq-lite/src/lite/subscriber.rs index 9054ca9bd..41bbd3007 100644 --- a/rs/moq-lite/src/lite/subscriber.rs +++ b/rs/moq-lite/src/lite/subscriber.rs @@ -134,16 +134,16 @@ impl Subscriber { // Make sure the peer doesn't double announce. match producers.entry(path.to_owned()) { Entry::Occupied(_) => return Err(Error::Duplicate), - Entry::Vacant(entry) => entry.insert(broadcast.producer.clone()), + Entry::Vacant(entry) => entry.insert(broadcast.clone()), }; // Run the broadcast in the background until all consumers are dropped. self.origin .as_mut() .unwrap() - .publish_broadcast(path.clone(), broadcast.consumer); + .publish_broadcast(path.clone(), broadcast.consume()); - web_async::spawn(self.clone().run_broadcast(path, broadcast.producer)); + web_async::spawn(self.clone().run_broadcast(path, broadcast)); Ok(()) } diff --git a/rs/moq-lite/src/model/broadcast.rs b/rs/moq-lite/src/model/broadcast.rs index c8affe410..e8eb62287 100644 --- a/rs/moq-lite/src/model/broadcast.rs +++ b/rs/moq-lite/src/model/broadcast.rs @@ -7,7 +7,7 @@ use std::{ }, }; -use crate::{Error, Produce, TrackConsumer, TrackProducer}; +use crate::{Error, TrackConsumer, TrackProducer}; use tokio::sync::watch; use web_async::Lock; @@ -32,10 +32,8 @@ pub struct Broadcast { } impl Broadcast { - pub fn produce() -> Produce { - let producer = BroadcastProducer::new(); - let consumer = producer.consume(); - Produce { producer, consumer } + pub fn produce() -> BroadcastProducer { + BroadcastProducer::new() } } @@ -76,9 +74,9 @@ impl BroadcastProducer { /// Produce a new track and insert it into the broadcast. pub fn create_track(&mut self, track: Track) -> TrackProducer { - let track = track.clone().produce(); - self.insert_track(track.consumer); - track.producer + let track = track.produce(); + self.insert_track(track.consume()); + track } /// Insert a track into the lookup, returning true if it was unique. @@ -205,9 +203,8 @@ impl BroadcastConsumer { } // Otherwise we have never seen this track before and need to create a new producer. - let track = track.clone().produce(); - let producer = track.producer; - let consumer = track.consumer; + let producer = track.clone().produce(); + let consumer = producer.consume(); // Insert the producer into the lookup so we will deduplicate requests. // This is not a subscriber so it doesn't count towards "used" subscribers. @@ -271,22 +268,22 @@ mod test { let mut track1 = Track::new("track1").produce(); // Make sure we can insert before a consumer is created. - producer.insert_track(track1.consumer); - track1.producer.append_group(); + producer.insert_track(track1.consume()); + track1.append_group(); let consumer = producer.consume(); - let mut track1_sub = consumer.subscribe_track(&track1.producer.info); + let mut track1_sub = consumer.subscribe_track(&track1.info); track1_sub.assert_group(); let mut track2 = Track::new("track2").produce(); - producer.insert_track(track2.consumer); + producer.insert_track(track2.consume()); let consumer2 = producer.consume(); - let mut track2_consumer = consumer2.subscribe_track(&track2.producer.info); + let mut track2_consumer = consumer2.subscribe_track(&track2.info); track2_consumer.assert_no_group(); - track2.producer.append_group(); + track2.append_group(); track2_consumer.assert_group(); } @@ -334,10 +331,10 @@ mod test { // Create a new track and insert it into the broadcast. let mut track1 = Track::new("track1").produce(); - track1.producer.append_group(); - producer.insert_track(track1.consumer); + track1.append_group(); + producer.insert_track(track1.consume()); - let mut track1c = consumer.subscribe_track(&track1.producer.info); + let mut track1c = consumer.subscribe_track(&track1.info); let track2 = consumer.subscribe_track(&Track::new("track2")); drop(producer); @@ -352,7 +349,7 @@ mod test { track1c.assert_not_closed(); // TODO: We should probably cascade the closed state. - drop(track1.producer); + drop(track1); track1c.assert_closed(); } @@ -408,12 +405,13 @@ mod test { #[tokio::test] async fn requested_unused() { let mut broadcast = Broadcast::produce(); + let consumer = broadcast.consume(); // Subscribe to a track that doesn't exist - this creates a request - let consumer1 = broadcast.consumer.subscribe_track(&Track::new("unknown_track")); + let consumer1 = consumer.subscribe_track(&Track::new("unknown_track")); // Get the requested track producer - let producer1 = broadcast.producer.assert_request(); + let producer1 = broadcast.assert_request(); // The track producer should NOT be unused yet because there's a consumer assert!( @@ -422,7 +420,7 @@ mod test { ); // Making a new consumer will keep the producer alive - let consumer2 = broadcast.consumer.subscribe_track(&Track::new("unknown_track")); + let consumer2 = consumer.subscribe_track(&Track::new("unknown_track")); consumer2.assert_is_clone(&consumer1); // Drop the consumer subscription @@ -449,8 +447,8 @@ mod test { tokio::time::sleep(std::time::Duration::from_millis(1)).await; // Now the cleanup task should have run and we can subscribe again to the unknown track. - let consumer3 = broadcast.consumer.subscribe_track(&Track::new("unknown_track")); - let producer2 = broadcast.producer.assert_request(); + let consumer3 = consumer.subscribe_track(&Track::new("unknown_track")); + let producer2 = broadcast.assert_request(); // Drop the consumer, now the producer should be unused drop(consumer3); diff --git a/rs/moq-lite/src/model/frame.rs b/rs/moq-lite/src/model/frame.rs index 23c2b29f9..140f3e61c 100644 --- a/rs/moq-lite/src/model/frame.rs +++ b/rs/moq-lite/src/model/frame.rs @@ -3,7 +3,7 @@ use std::future::Future; use bytes::{Bytes, BytesMut}; use tokio::sync::watch; -use crate::{Error, Produce, Result}; +use crate::{Error, Result}; /// A chunk of data with an upfront size. #[derive(Clone, Debug)] @@ -13,11 +13,9 @@ pub struct Frame { } impl Frame { - /// Create a new producer and consumer for the frame. - pub fn produce(self) -> Produce { - let producer = FrameProducer::new(self); - let consumer = producer.consume(); - Produce { producer, consumer } + /// Create a new producer for the frame. + pub fn produce(self) -> FrameProducer { + FrameProducer::new(self) } } diff --git a/rs/moq-lite/src/model/group.rs b/rs/moq-lite/src/model/group.rs index 1bd440769..62aa301b8 100644 --- a/rs/moq-lite/src/model/group.rs +++ b/rs/moq-lite/src/model/group.rs @@ -12,7 +12,7 @@ use std::future::Future; use bytes::Bytes; use tokio::sync::watch; -use crate::{Error, Produce, Result}; +use crate::{Error, Result}; use super::{Frame, FrameConsumer, FrameProducer}; @@ -26,10 +26,8 @@ pub struct Group { } impl Group { - pub fn produce(self) -> Produce { - let producer = GroupProducer::new(self); - let consumer = producer.consume(); - Produce { producer, consumer } + pub fn produce(self) -> GroupProducer { + GroupProducer::new(self) } } @@ -106,9 +104,9 @@ impl GroupProducer { /// Create a frame with an upfront size pub fn create_frame(&mut self, info: Frame) -> FrameProducer { - let frame = Frame::produce(info); - self.append_frame(frame.consumer); - frame.producer + let frame = info.produce(); + self.append_frame(frame.consume()); + frame } /// Append a frame to the group. diff --git a/rs/moq-lite/src/model/mod.rs b/rs/moq-lite/src/model/mod.rs index c3bd0f5fc..460a300df 100644 --- a/rs/moq-lite/src/model/mod.rs +++ b/rs/moq-lite/src/model/mod.rs @@ -2,7 +2,6 @@ mod broadcast; mod frame; mod group; mod origin; -mod produce; mod time; mod track; @@ -10,6 +9,5 @@ pub use broadcast::*; pub use frame::*; pub use group::*; pub use origin::*; -pub use produce::*; pub use time::*; pub use track::*; diff --git a/rs/moq-lite/src/model/origin.rs b/rs/moq-lite/src/model/origin.rs index f0dff3613..630f46f99 100644 --- a/rs/moq-lite/src/model/origin.rs +++ b/rs/moq-lite/src/model/origin.rs @@ -6,7 +6,7 @@ use tokio::sync::mpsc; use web_async::Lock; use super::BroadcastConsumer; -use crate::{AsPath, Broadcast, BroadcastProducer, Path, PathOwned, Produce}; +use crate::{AsPath, Broadcast, BroadcastProducer, Path, PathOwned}; static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0); @@ -334,10 +334,8 @@ pub type OriginAnnounce = (PathOwned, Option); pub struct Origin {} impl Origin { - pub fn produce() -> Produce { - let producer = OriginProducer::default(); - let consumer = producer.consume(); - Produce { producer, consumer } + pub fn produce() -> OriginProducer { + OriginProducer::default() } } @@ -359,8 +357,7 @@ impl OriginProducer { /// Returns [None] if the broadcast is not allowed to be published. pub fn create_broadcast(&self, path: impl AsPath) -> Option { let broadcast = Broadcast::produce(); - self.publish_broadcast(path, broadcast.consumer) - .then_some(broadcast.producer) + self.publish_broadcast(path, broadcast.consume()).then_some(broadcast) } /// Publish a broadcast, announcing it to all consumers. @@ -611,32 +608,32 @@ mod tests { let broadcast1 = Broadcast::produce(); let broadcast2 = Broadcast::produce(); - let mut consumer1 = origin.consumer; + let mut consumer1 = origin.consume(); // Make a new consumer that should get it. consumer1.assert_next_wait(); // Publish the first broadcast. - origin.producer.publish_broadcast("test1", broadcast1.consumer); + origin.publish_broadcast("test1", broadcast1.consume()); - consumer1.assert_next("test1", &broadcast1.producer.consume()); + consumer1.assert_next("test1", &broadcast1.consume()); consumer1.assert_next_wait(); // Make a new consumer that should get the existing broadcast. // But we don't consume it yet. - let mut consumer2 = origin.producer.consume(); + let mut consumer2 = origin.consume(); // Publish the second broadcast. - origin.producer.publish_broadcast("test2", broadcast2.consumer); + origin.publish_broadcast("test2", broadcast2.consume()); - consumer1.assert_next("test2", &broadcast2.producer.consume()); + consumer1.assert_next("test2", &broadcast2.consume()); consumer1.assert_next_wait(); - consumer2.assert_next("test1", &broadcast1.producer.consume()); - consumer2.assert_next("test2", &broadcast2.producer.consume()); + consumer2.assert_next("test1", &broadcast1.consume()); + consumer2.assert_next("test2", &broadcast2.consume()); consumer2.assert_next_wait(); // Close the first broadcast. - drop(broadcast1.producer); + drop(broadcast1); // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; @@ -648,12 +645,12 @@ mod tests { consumer2.assert_next_wait(); // And a new consumer only gets the last broadcast. - let mut consumer3 = origin.producer.consume(); - consumer3.assert_next("test2", &broadcast2.producer.consume()); + let mut consumer3 = origin.consume(); + consumer3.assert_next("test2", &broadcast2.consume()); consumer3.assert_next_wait(); // Close the other producer and make sure it cleans up - drop(broadcast2.producer); + drop(broadcast2); // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; @@ -671,173 +668,176 @@ mod tests { #[tokio::test] async fn test_duplicate() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); + let mut consumer = origin.consume(); let broadcast1 = Broadcast::produce(); let broadcast2 = Broadcast::produce(); let broadcast3 = Broadcast::produce(); - let consumer1 = broadcast1.consumer; - let consumer2 = broadcast2.consumer; - let consumer3 = broadcast3.consumer; + let consumer1 = broadcast1.consume(); + let consumer2 = broadcast2.consume(); + let consumer3 = broadcast3.consume(); - origin.producer.publish_broadcast("test", consumer1.clone()); - origin.producer.publish_broadcast("test", consumer2.clone()); - origin.producer.publish_broadcast("test", consumer3.clone()); + origin.publish_broadcast("test", consumer1.clone()); + origin.publish_broadcast("test", consumer2.clone()); + origin.publish_broadcast("test", consumer3.clone()); - assert!(origin.consumer.consume_broadcast("test").is_some()); + assert!(consumer.consume_broadcast("test").is_some()); - origin.consumer.assert_next("test", &consumer1); - origin.consumer.assert_next_none("test"); - origin.consumer.assert_next("test", &consumer2); - origin.consumer.assert_next_none("test"); - origin.consumer.assert_next("test", &consumer3); + consumer.assert_next("test", &consumer1); + consumer.assert_next_none("test"); + consumer.assert_next("test", &consumer2); + consumer.assert_next_none("test"); + consumer.assert_next("test", &consumer3); // Drop the backup, nothing should change. - drop(broadcast2.producer); + drop(broadcast2); // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.consumer.consume_broadcast("test").is_some()); - origin.consumer.assert_next_wait(); + assert!(consumer.consume_broadcast("test").is_some()); + consumer.assert_next_wait(); // Drop the active, we should reannounce. - drop(broadcast3.producer); + drop(broadcast3); // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.consumer.consume_broadcast("test").is_some()); - origin.consumer.assert_next_none("test"); - origin.consumer.assert_next("test", &consumer1); + assert!(consumer.consume_broadcast("test").is_some()); + consumer.assert_next_none("test"); + consumer.assert_next("test", &consumer1); // Drop the final broadcast, we should unannounce. - drop(broadcast1.producer); + drop(broadcast1); // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.consumer.consume_broadcast("test").is_none()); + assert!(consumer.consume_broadcast("test").is_none()); - origin.consumer.assert_next_none("test"); - origin.consumer.assert_next_wait(); + consumer.assert_next_none("test"); + consumer.assert_next_wait(); } #[tokio::test] async fn test_duplicate_reverse() { let origin = Origin::produce(); + let consumer = origin.consume(); let broadcast1 = Broadcast::produce(); let broadcast2 = Broadcast::produce(); - origin.producer.publish_broadcast("test", broadcast1.consumer); - origin.producer.publish_broadcast("test", broadcast2.consumer); - assert!(origin.consumer.consume_broadcast("test").is_some()); + origin.publish_broadcast("test", broadcast1.consume()); + origin.publish_broadcast("test", broadcast2.consume()); + assert!(consumer.consume_broadcast("test").is_some()); // This is harder, dropping the new broadcast first. - drop(broadcast2.producer); + drop(broadcast2); // Wait for the cleanup async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.consumer.consume_broadcast("test").is_some()); + assert!(consumer.consume_broadcast("test").is_some()); - drop(broadcast1.producer); + drop(broadcast1); // Wait for the cleanup async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.consumer.consume_broadcast("test").is_none()); + assert!(consumer.consume_broadcast("test").is_none()); } #[tokio::test] async fn test_double_publish() { let origin = Origin::produce(); + let consumer = origin.consume(); let broadcast = Broadcast::produce(); // Ensure it doesn't crash. - origin.producer.publish_broadcast("test", broadcast.producer.consume()); - origin.producer.publish_broadcast("test", broadcast.producer.consume()); + origin.publish_broadcast("test", broadcast.consume()); + origin.publish_broadcast("test", broadcast.consume()); - assert!(origin.consumer.consume_broadcast("test").is_some()); + assert!(consumer.consume_broadcast("test").is_some()); - drop(broadcast.producer); + drop(broadcast); // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.consumer.consume_broadcast("test").is_none()); + assert!(consumer.consume_broadcast("test").is_none()); } // There was a tokio bug where only the first 127 broadcasts would be received instantly. #[tokio::test] #[should_panic] async fn test_128() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); + let mut consumer = origin.consume(); let broadcast = Broadcast::produce(); for i in 0..256 { - origin - .producer - .publish_broadcast(format!("test{i}"), broadcast.consumer.clone()); + origin.publish_broadcast(format!("test{i}"), broadcast.consume()); } for i in 0..256 { - origin.consumer.assert_next(format!("test{i}"), &broadcast.consumer); + consumer.assert_next(format!("test{i}"), &broadcast.consume()); } } #[tokio::test] async fn test_128_fix() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); + let mut consumer = origin.consume(); let broadcast = Broadcast::produce(); for i in 0..256 { - origin - .producer - .publish_broadcast(format!("test{i}"), broadcast.consumer.clone()); + origin.publish_broadcast(format!("test{i}"), broadcast.consume()); } for i in 0..256 { // try_next does not have the same issue because it's synchronous. - origin.consumer.assert_try_next(format!("test{i}"), &broadcast.consumer); + consumer.assert_try_next(format!("test{i}"), &broadcast.consume()); } } #[tokio::test] async fn test_with_root_basic() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); + let mut consumer = origin.consume(); let broadcast = Broadcast::produce(); // Create a producer with root "/foo" - let foo_producer = origin.producer.with_root("foo").expect("should create root"); + let foo_producer = origin.with_root("foo").expect("should create root"); assert_eq!(foo_producer.root().as_str(), "foo"); // When publishing to "bar/baz", it should actually publish to "foo/bar/baz" - assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consumer.clone())); + assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume())); // The original consumer should see the full path - origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer); + consumer.assert_next("foo/bar/baz", &broadcast.consume()); // A consumer created from the rooted producer should see the stripped path let mut foo_consumer = foo_producer.consume(); - foo_consumer.assert_next("bar/baz", &broadcast.consumer); + foo_consumer.assert_next("bar/baz", &broadcast.consume()); } #[tokio::test] async fn test_with_root_nested() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); + let mut consumer = origin.consume(); let broadcast = Broadcast::produce(); // Create nested roots - let foo_producer = origin.producer.with_root("foo").expect("should create foo root"); + let foo_producer = origin.with_root("foo").expect("should create foo root"); let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root"); assert_eq!(foo_bar_producer.root().as_str(), "foo/bar"); // Publishing to "baz" should actually publish to "foo/bar/baz" - assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consumer.clone())); + assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume())); // The original consumer sees the full path - origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer); + consumer.assert_next("foo/bar/baz", &broadcast.consume()); // Consumer from foo_bar_producer sees just "baz" let mut foo_bar_consumer = foo_bar_producer.consume(); - foo_bar_consumer.assert_next("baz", &broadcast.consumer); + foo_bar_consumer.assert_next("baz", &broadcast.consume()); } #[tokio::test] @@ -847,19 +847,18 @@ mod tests { // Create a producer that can only publish to "allowed" paths let limited_producer = origin - .producer .publish_only(&["allowed/path1".into(), "allowed/path2".into()]) .expect("should create limited producer"); // Should be able to publish to allowed paths - assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consumer.clone())); + assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume())); + assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume())); + assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume())); // Should not be able to publish to disallowed paths - assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consumer.clone())); - assert!(!limited_producer.publish_broadcast("allowed", broadcast.consumer.clone())); // Parent of allowed path - assert!(!limited_producer.publish_broadcast("other/path", broadcast.consumer.clone())); + assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume())); + assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); // Parent of allowed path + assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume())); } #[tokio::test] @@ -867,79 +866,68 @@ mod tests { let origin = Origin::produce(); // Creating a producer with no allowed paths should return None - assert!(origin.producer.publish_only(&[]).is_none()); + assert!(origin.publish_only(&[]).is_none()); } #[tokio::test] async fn test_consume_only_filters() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); + let mut consumer = origin.consume(); let broadcast1 = Broadcast::produce(); let broadcast2 = Broadcast::produce(); let broadcast3 = Broadcast::produce(); // Publish to different paths - origin - .producer - .publish_broadcast("allowed", broadcast1.consumer.clone()); - origin - .producer - .publish_broadcast("allowed/nested", broadcast2.consumer.clone()); - origin - .producer - .publish_broadcast("notallowed", broadcast3.consumer.clone()); + origin.publish_broadcast("allowed", broadcast1.consume()); + origin.publish_broadcast("allowed/nested", broadcast2.consume()); + origin.publish_broadcast("notallowed", broadcast3.consume()); // Create a consumer that only sees "allowed" paths - let mut limited_consumer = origin - .consumer + let mut limited_consumer = consumer .consume_only(&["allowed".into()]) .expect("should create limited consumer"); // Should only receive broadcasts under "allowed" - limited_consumer.assert_next("allowed", &broadcast1.consumer); - limited_consumer.assert_next("allowed/nested", &broadcast2.consumer); + limited_consumer.assert_next("allowed", &broadcast1.consume()); + limited_consumer.assert_next("allowed/nested", &broadcast2.consume()); limited_consumer.assert_next_wait(); // Should not see "notallowed" // Original consumer should see all - origin.consumer.assert_next("allowed", &broadcast1.consumer); - origin.consumer.assert_next("allowed/nested", &broadcast2.consumer); - origin.consumer.assert_next("notallowed", &broadcast3.consumer); + consumer.assert_next("allowed", &broadcast1.consume()); + consumer.assert_next("allowed/nested", &broadcast2.consume()); + consumer.assert_next("notallowed", &broadcast3.consume()); } #[tokio::test] async fn test_consume_only_multiple_prefixes() { let origin = Origin::produce(); + let consumer = origin.consume(); let broadcast1 = Broadcast::produce(); let broadcast2 = Broadcast::produce(); let broadcast3 = Broadcast::produce(); - origin - .producer - .publish_broadcast("foo/test", broadcast1.consumer.clone()); - origin - .producer - .publish_broadcast("bar/test", broadcast2.consumer.clone()); - origin - .producer - .publish_broadcast("baz/test", broadcast3.consumer.clone()); + origin.publish_broadcast("foo/test", broadcast1.consume()); + origin.publish_broadcast("bar/test", broadcast2.consume()); + origin.publish_broadcast("baz/test", broadcast3.consume()); // Consumer that only sees "foo" and "bar" paths - let mut limited_consumer = origin - .consumer + let mut limited_consumer = consumer .consume_only(&["foo".into(), "bar".into()]) .expect("should create limited consumer"); - limited_consumer.assert_next("foo/test", &broadcast1.consumer); - limited_consumer.assert_next("bar/test", &broadcast2.consumer); + limited_consumer.assert_next("foo/test", &broadcast1.consume()); + limited_consumer.assert_next("bar/test", &broadcast2.consume()); limited_consumer.assert_next_wait(); // Should not see "baz/test" } #[tokio::test] async fn test_with_root_and_publish_only() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); + let mut consumer = origin.consume(); let broadcast = Broadcast::produce(); // User connects to /foo root - let foo_producer = origin.producer.with_root("foo").expect("should create foo root"); + let foo_producer = origin.with_root("foo").expect("should create foo root"); // Limit them to publish only to "bar" and "goop/pee" within /foo let limited_producer = foo_producer @@ -947,21 +935,21 @@ mod tests { .expect("should create limited producer"); // Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee) - assert!(limited_producer.publish_broadcast("bar", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consumer.clone())); + assert!(limited_producer.publish_broadcast("bar", broadcast.consume())); + assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume())); + assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume())); + assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume())); // Should not be able to publish outside allowed paths - assert!(!limited_producer.publish_broadcast("baz", broadcast.consumer.clone())); - assert!(!limited_producer.publish_broadcast("goop", broadcast.consumer.clone())); // Parent of allowed - assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consumer.clone())); + assert!(!limited_producer.publish_broadcast("baz", broadcast.consume())); + assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); // Parent of allowed + assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume())); // Original consumer sees full paths - origin.consumer.assert_next("foo/bar", &broadcast.consumer); - origin.consumer.assert_next("foo/bar/nested", &broadcast.consumer); - origin.consumer.assert_next("foo/goop/pee", &broadcast.consumer); - origin.consumer.assert_next("foo/goop/pee/nested", &broadcast.consumer); + consumer.assert_next("foo/bar", &broadcast.consume()); + consumer.assert_next("foo/bar/nested", &broadcast.consume()); + consumer.assert_next("foo/goop/pee", &broadcast.consume()); + consumer.assert_next("foo/goop/pee/nested", &broadcast.consume()); } #[tokio::test] @@ -972,18 +960,12 @@ mod tests { let broadcast3 = Broadcast::produce(); // Publish broadcasts - origin - .producer - .publish_broadcast("foo/bar/test", broadcast1.consumer.clone()); - origin - .producer - .publish_broadcast("foo/goop/pee/test", broadcast2.consumer.clone()); - origin - .producer - .publish_broadcast("foo/other/test", broadcast3.consumer.clone()); + origin.publish_broadcast("foo/bar/test", broadcast1.consume()); + origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume()); + origin.publish_broadcast("foo/other/test", broadcast3.consume()); // User connects to /foo root - let foo_producer = origin.producer.with_root("foo").expect("should create foo root"); + let foo_producer = origin.with_root("foo").expect("should create foo root"); // Create consumer limited to "bar" and "goop/pee" within /foo let mut limited_consumer = foo_producer @@ -991,8 +973,8 @@ mod tests { .expect("should create limited consumer"); // Should only see allowed paths (without foo prefix) - limited_consumer.assert_next("bar/test", &broadcast1.consumer); - limited_consumer.assert_next("goop/pee/test", &broadcast2.consumer); + limited_consumer.assert_next("bar/test", &broadcast1.consume()); + limited_consumer.assert_next("goop/pee/test", &broadcast2.consume()); limited_consumer.assert_next_wait(); // Should not see "other/test" } @@ -1002,7 +984,6 @@ mod tests { // First limit the producer to specific paths let limited_producer = origin - .producer .publish_only(&["allowed".into()]) .expect("should create limited producer"); @@ -1022,11 +1003,11 @@ mod tests { let broadcast = Broadcast::produce(); // Producer with root access (empty string means wildcard) - let root_producer = origin.producer.clone(); + let root_producer = origin.clone(); // Should be able to publish anywhere - assert!(root_producer.publish_broadcast("any/path", broadcast.consumer.clone())); - assert!(root_producer.publish_broadcast("other/path", broadcast.consumer.clone())); + assert!(root_producer.publish_broadcast("any/path", broadcast.consume())); + assert!(root_producer.publish_broadcast("other/path", broadcast.consume())); // Can create any root let foo_producer = root_producer.with_root("foo").expect("should create any root"); @@ -1036,33 +1017,29 @@ mod tests { #[tokio::test] async fn test_consume_broadcast_with_permissions() { let origin = Origin::produce(); + let consumer = origin.consume(); let broadcast1 = Broadcast::produce(); let broadcast2 = Broadcast::produce(); - origin - .producer - .publish_broadcast("allowed/test", broadcast1.consumer.clone()); - origin - .producer - .publish_broadcast("notallowed/test", broadcast2.consumer.clone()); + origin.publish_broadcast("allowed/test", broadcast1.consume()); + origin.publish_broadcast("notallowed/test", broadcast2.consume()); // Create limited consumer - let limited_consumer = origin - .consumer + let limited_consumer = consumer .consume_only(&["allowed".into()]) .expect("should create limited consumer"); // Should be able to get allowed broadcast let result = limited_consumer.consume_broadcast("allowed/test"); assert!(result.is_some()); - assert!(result.unwrap().is_clone(&broadcast1.consumer)); + assert!(result.unwrap().is_clone(&broadcast1.consume())); // Should not be able to get disallowed broadcast assert!(limited_consumer.consume_broadcast("notallowed/test").is_none()); // Original consumer can get both - assert!(origin.consumer.consume_broadcast("allowed/test").is_some()); - assert!(origin.consumer.consume_broadcast("notallowed/test").is_some()); + assert!(consumer.consume_broadcast("allowed/test").is_some()); + assert!(consumer.consume_broadcast("notallowed/test").is_some()); } #[tokio::test] @@ -1072,64 +1049,55 @@ mod tests { // Create producer limited to "a/b/c" let limited_producer = origin - .producer .publish_only(&["a/b/c".into()]) .expect("should create limited producer"); // Should be able to publish to exact path and nested paths - assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consumer.clone())); + assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume())); + assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume())); + assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume())); // Should not be able to publish to parent or sibling paths - assert!(!limited_producer.publish_broadcast("a", broadcast.consumer.clone())); - assert!(!limited_producer.publish_broadcast("a/b", broadcast.consumer.clone())); - assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consumer.clone())); + assert!(!limited_producer.publish_broadcast("a", broadcast.consume())); + assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume())); + assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume())); } #[tokio::test] async fn test_multiple_consumers_with_different_permissions() { let origin = Origin::produce(); + let consumer = origin.consume(); let broadcast1 = Broadcast::produce(); let broadcast2 = Broadcast::produce(); let broadcast3 = Broadcast::produce(); // Publish to different paths - origin - .producer - .publish_broadcast("foo/test", broadcast1.consumer.clone()); - origin - .producer - .publish_broadcast("bar/test", broadcast2.consumer.clone()); - origin - .producer - .publish_broadcast("baz/test", broadcast3.consumer.clone()); + origin.publish_broadcast("foo/test", broadcast1.consume()); + origin.publish_broadcast("bar/test", broadcast2.consume()); + origin.publish_broadcast("baz/test", broadcast3.consume()); // Create consumers with different permissions - let mut foo_consumer = origin - .consumer + let mut foo_consumer = consumer .consume_only(&["foo".into()]) .expect("should create foo consumer"); - let mut bar_consumer = origin - .consumer + let mut bar_consumer = consumer .consume_only(&["bar".into()]) .expect("should create bar consumer"); - let mut foobar_consumer = origin - .consumer + let mut foobar_consumer = consumer .consume_only(&["foo".into(), "bar".into()]) .expect("should create foobar consumer"); // Each consumer should only see their allowed paths - foo_consumer.assert_next("foo/test", &broadcast1.consumer); + foo_consumer.assert_next("foo/test", &broadcast1.consume()); foo_consumer.assert_next_wait(); - bar_consumer.assert_next("bar/test", &broadcast2.consumer); + bar_consumer.assert_next("bar/test", &broadcast2.consume()); bar_consumer.assert_next_wait(); - foobar_consumer.assert_next("foo/test", &broadcast1.consumer); - foobar_consumer.assert_next("bar/test", &broadcast2.consumer); + foobar_consumer.assert_next("foo/test", &broadcast1.consume()); + foobar_consumer.assert_next("bar/test", &broadcast2.consume()); foobar_consumer.assert_next_wait(); } @@ -1140,14 +1108,14 @@ mod tests { let broadcast2 = Broadcast::produce(); // User with root "demo" allowed to subscribe to "worm-node" and "foobar" - let demo_producer = origin.producer.with_root("demo").expect("should create demo root"); + let demo_producer = origin.with_root("demo").expect("should create demo root"); let limited_producer = demo_producer .publish_only(&["worm-node".into(), "foobar".into()]) .expect("should create limited producer"); // Publish some broadcasts - assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consumer.clone())); - assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consumer.clone())); + assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume())); + assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume())); // consume_only with empty prefix should keep the exact same "worm-node" and "foobar" nodes let mut consumer = limited_producer @@ -1155,8 +1123,8 @@ mod tests { .expect("should create consumer with empty prefix"); // Should still see both broadcasts - consumer.assert_next("worm-node/test", &broadcast1.consumer); - consumer.assert_next("foobar/test", &broadcast2.consumer); + consumer.assert_next("worm-node/test", &broadcast1.consume()); + consumer.assert_next("foobar/test", &broadcast2.consume()); consumer.assert_next_wait(); } @@ -1168,15 +1136,15 @@ mod tests { let broadcast3 = Broadcast::produce(); // User with root "demo" allowed to subscribe to "worm-node" and "foobar" - let demo_producer = origin.producer.with_root("demo").expect("should create demo root"); + let demo_producer = origin.with_root("demo").expect("should create demo root"); let limited_producer = demo_producer .publish_only(&["worm-node".into(), "foobar".into()]) .expect("should create limited producer"); // Publish broadcasts at different levels - assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consumer.clone())); - assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consumer.clone())); - assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consumer.clone())); + assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume())); + assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume())); + assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume())); // Test 1: consume_only("worm-node") should result in a single "" node with contents of "worm-node" ONLY let mut worm_consumer = limited_producer @@ -1184,8 +1152,8 @@ mod tests { .expect("should create worm-node consumer"); // Should see worm-node content with paths stripped to "" - worm_consumer.assert_next("worm-node", &broadcast1.consumer); - worm_consumer.assert_next("worm-node/foo", &broadcast2.consumer); + worm_consumer.assert_next("worm-node", &broadcast1.consume()); + worm_consumer.assert_next("worm-node/foo", &broadcast2.consume()); worm_consumer.assert_next_wait(); // Should NOT see foobar content // Test 2: consume_only("worm-node/foo") should result in a "" node with contents of "worm-node/foo" @@ -1193,7 +1161,7 @@ mod tests { .consume_only(&["worm-node/foo".into()]) .expect("should create worm-node/foo consumer"); - foo_consumer.assert_next("worm-node/foo", &broadcast2.consumer); + foo_consumer.assert_next("worm-node/foo", &broadcast2.consume()); foo_consumer.assert_next_wait(); // Should NOT see other content } @@ -1206,14 +1174,13 @@ mod tests { // Producer with multiple allowed roots let limited_producer = origin - .producer .publish_only(&["app1".into(), "app2".into(), "shared".into()]) .expect("should create limited producer"); // Publish to each root - assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consumer.clone())); - assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consumer.clone())); - assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consumer.clone())); + assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume())); + assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume())); + assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume())); // consume_only with empty prefix should maintain all roots let mut consumer = limited_producer @@ -1221,9 +1188,9 @@ mod tests { .expect("should create consumer with empty prefix"); // Should see all broadcasts from all roots - consumer.assert_next("app1/data", &broadcast1.consumer); - consumer.assert_next("app2/config", &broadcast2.consumer); - consumer.assert_next("shared/resource", &broadcast3.consumer); + consumer.assert_next("app1/data", &broadcast1.consume()); + consumer.assert_next("app2/config", &broadcast2.consume()); + consumer.assert_next("shared/resource", &broadcast3.consume()); consumer.assert_next_wait(); } @@ -1234,7 +1201,6 @@ mod tests { // Producer with specific allowed paths let limited_producer = origin - .producer .publish_only(&["services/api".into(), "services/web".into()]) .expect("should create limited producer"); @@ -1244,10 +1210,10 @@ mod tests { .expect("should create producer with empty prefix"); // Should still have the same publishing restrictions - assert!(same_producer.publish_broadcast("services/api", broadcast.consumer.clone())); - assert!(same_producer.publish_broadcast("services/web", broadcast.consumer.clone())); - assert!(!same_producer.publish_broadcast("services/db", broadcast.consumer.clone())); - assert!(!same_producer.publish_broadcast("other", broadcast.consumer.clone())); + assert!(same_producer.publish_broadcast("services/api", broadcast.consume())); + assert!(same_producer.publish_broadcast("services/web", broadcast.consume())); + assert!(!same_producer.publish_broadcast("services/db", broadcast.consume())); + assert!(!same_producer.publish_broadcast("other", broadcast.consume())); } #[tokio::test] @@ -1259,21 +1225,20 @@ mod tests { // Producer with broad permission let limited_producer = origin - .producer .publish_only(&["org".into()]) .expect("should create limited producer"); // Publish at various depths - assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consumer.clone())); - assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consumer.clone())); - assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consumer.clone())); + assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume())); + assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume())); + assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume())); // Narrow down to team1 only let mut team1_consumer = limited_producer .consume_only(&["org/team2".into()]) .expect("should create team1 consumer"); - team1_consumer.assert_next("org/team2/project1", &broadcast3.consumer); + team1_consumer.assert_next("org/team2/project1", &broadcast3.consume()); team1_consumer.assert_next_wait(); // Should NOT see team1 content // Further narrow down to team1/project1 @@ -1282,7 +1247,7 @@ mod tests { .expect("should create project1 consumer"); // Should only see project1 content at root - project1_consumer.assert_next("org/team1/project1", &broadcast1.consumer); + project1_consumer.assert_next("org/team1/project1", &broadcast1.consume()); project1_consumer.assert_next_wait(); } @@ -1292,7 +1257,6 @@ mod tests { // Producer with specific allowed paths let limited_producer = origin - .producer .publish_only(&["allowed/path".into()]) .expect("should create limited producer"); @@ -1310,14 +1274,14 @@ mod tests { let broadcast2 = Broadcast::produce(); // Setup: user with root "demo" allowed to subscribe to specific paths - let demo_producer = origin.producer.with_root("demo").expect("should create demo root"); + let demo_producer = origin.with_root("demo").expect("should create demo root"); let user_producer = demo_producer .publish_only(&["worm-node".into(), "foobar".into()]) .expect("should create user producer"); // Publish some data - assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consumer.clone())); - assert!(user_producer.publish_broadcast("foobar", broadcast2.consumer.clone())); + assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume())); + assert!(user_producer.publish_broadcast("foobar", broadcast2.consume())); // Key test: consume_only with "" should maintain access to allowed roots let mut consumer = user_producer @@ -1325,8 +1289,8 @@ mod tests { .expect("consume_only with empty prefix should not fail when user has specific permissions"); // Should still receive broadcasts from allowed paths - consumer.assert_next("worm-node/data", &broadcast1.consumer); - consumer.assert_next("foobar", &broadcast2.consumer); + consumer.assert_next("worm-node/data", &broadcast1.consume()); + consumer.assert_next("foobar", &broadcast2.consume()); consumer.assert_next_wait(); // Also test that we can still narrow the scope @@ -1334,7 +1298,7 @@ mod tests { .consume_only(&["worm-node".into()]) .expect("should be able to narrow scope to worm-node"); - narrow_consumer.assert_next("worm-node/data", &broadcast1.consumer); + narrow_consumer.assert_next("worm-node/data", &broadcast1.consume()); narrow_consumer.assert_next_wait(); // Should not see foobar } } diff --git a/rs/moq-lite/src/model/produce.rs b/rs/moq-lite/src/model/produce.rs deleted file mode 100644 index 0c2520649..000000000 --- a/rs/moq-lite/src/model/produce.rs +++ /dev/null @@ -1,10 +0,0 @@ -/// A named tuple of a producer and consumer for convenience. -/// -/// The producer and consumer may each be cloned as many times as you want. -/// However when the number of references reaches zero, the other will receive a signal to close. -/// A new consumer may be created at any time by calling the producer's `consume()` method. -#[derive(Clone)] -pub struct Produce { - pub producer: P, - pub consumer: C, -} diff --git a/rs/moq-lite/src/model/track.rs b/rs/moq-lite/src/model/track.rs index b90078474..8ba18bc2e 100644 --- a/rs/moq-lite/src/model/track.rs +++ b/rs/moq-lite/src/model/track.rs @@ -14,7 +14,7 @@ use tokio::sync::watch; -use crate::{Error, Produce, Result}; +use crate::{Error, Result}; use super::{Group, GroupConsumer, GroupProducer}; @@ -35,10 +35,8 @@ impl Track { } } - pub fn produce(self) -> Produce { - let producer = TrackProducer::new(self); - let consumer = producer.consume(); - Produce { producer, consumer } + pub fn produce(self) -> TrackProducer { + TrackProducer::new(self) } } @@ -86,7 +84,7 @@ impl TrackProducer { /// If the sequence number is not the latest, this method will return None. pub fn create_group(&mut self, info: Group) -> Option { let group = info.produce(); - self.insert_group(group.consumer).then_some(group.producer) + self.insert_group(group.consume()).then_some(group) } /// Create a new group with the next sequence number. @@ -98,8 +96,8 @@ impl TrackProducer { let sequence = state.latest.as_ref().map_or(0, |group| group.info.sequence + 1); let group = Group { sequence }.produce(); - state.latest = Some(group.consumer); - producer = Some(group.producer); + state.latest = Some(group.consume()); + producer = Some(group); true }); diff --git a/rs/moq-native/examples/chat.rs b/rs/moq-native/examples/chat.rs index b73f4d7a3..1bd1cb9a9 100644 --- a/rs/moq-native/examples/chat.rs +++ b/rs/moq-native/examples/chat.rs @@ -7,13 +7,14 @@ async fn main() -> anyhow::Result<()> { // Create an origin that we can publish to and the session can consume from. let origin = moq_lite::Origin::produce(); + let consumer = origin.consume(); // Run the broadcast production and the session in parallel. // This is a simple example of how you can concurrently run multiple tasks. // tokio::spawn works too. tokio::select! { - res = run_broadcast(origin.producer) => res, - res = run_session(origin.consumer) => res, + res = run_broadcast(origin) => res, + res = run_session(consumer) => res, } } @@ -41,7 +42,7 @@ async fn run_broadcast(origin: moq_lite::OriginProducer) -> anyhow::Result<()> { // Create a track that we'll insert into the broadcast. // A track is a series of groups representing a live stream. - let mut track = broadcast.producer.create_track(moq_lite::Track { + let mut track = broadcast.create_track(moq_lite::Track { name: "chat".to_string(), priority: 0, }); @@ -49,7 +50,7 @@ async fn run_broadcast(origin: moq_lite::OriginProducer) -> anyhow::Result<()> { // NOTE: The path is empty because we're using the URL to scope the broadcast. // If you put "alice" here, it would be published as "anon/chat-example/alice". // OPTIONAL: We publish after inserting the track just to avoid a nearly impossible race condition. - origin.publish_broadcast("", broadcast.consumer); + origin.publish_broadcast("", broadcast.consume()); // Create a group. // Each group is independent and the newest group(s) will be prioritized. diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index ce9498ae8..e749ef495 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -57,16 +57,16 @@ pub struct Cluster { client: moq_native::Client, // Advertises ourselves as an origin to other nodes. - noop: moq_lite::Produce, + noop: BroadcastProducer, // Broadcasts announced by local clients (users). - pub primary: Arc>, + pub primary: Arc, // Broadcasts announced by remote servers (cluster). - pub secondary: Arc>, + pub secondary: Arc, // Broadcasts announced by local clients and remote servers. - pub combined: Arc>, + pub combined: Arc, } impl Cluster { @@ -92,7 +92,7 @@ impl Cluster { }; // Scope the origin to our root. - let subscribe_origin = subscribe_origin.producer.with_root(&token.root)?; + let subscribe_origin = subscribe_origin.with_root(&token.root)?; subscribe_origin.consume_only(&token.subscribe) } @@ -104,15 +104,15 @@ impl Cluster { false => &self.primary, }; - let publish_origin = publish_origin.producer.with_root(&token.root)?; + let publish_origin = publish_origin.with_root(&token.root)?; publish_origin.publish_only(&token.publish) } pub fn get(&self, broadcast: &str) -> Option { self.primary - .consumer + .consume() .consume_broadcast(broadcast) - .or_else(|| self.secondary.consumer.consume_broadcast(broadcast)) + .or_else(|| self.secondary.consume().consume_broadcast(broadcast)) } pub async fn run(self) -> anyhow::Result<()> { @@ -133,14 +133,13 @@ impl Cluster { // Use with_root to automatically strip the prefix from announced paths. let origins = self .secondary - .producer .with_root(&self.config.prefix) .context("no authorized origins")?; // Announce ourselves as an origin to the root node. if let Some(myself) = self.config.node.as_ref() { tracing::info!(%myself, "announcing as leaf"); - origins.publish_broadcast(myself, self.noop.consumer.clone()); + origins.publish_broadcast(myself, self.noop.consume()); } // If the token is provided, read it from the disk and use it in the query parameter. @@ -150,7 +149,7 @@ impl Cluster { None => "".to_string(), }; - let noop = self.noop.consumer.clone(); + let noop = self.noop.consume(); // Despite returning a Result, we should NEVER return an Ok tokio::select! { @@ -171,8 +170,8 @@ impl Cluster { // Shovel broadcasts from the primary and secondary origins into the combined origin. async fn run_combined(self) -> anyhow::Result<()> { - let mut primary = self.primary.consumer.consume(); - let mut secondary = self.secondary.consumer.consume(); + let mut primary = self.primary.consume(); + let mut secondary = self.secondary.consume(); loop { let (name, broadcast) = tokio::select! { @@ -183,7 +182,7 @@ impl Cluster { }; if let Some(broadcast) = broadcast { - self.combined.producer.publish_broadcast(&name, broadcast); + self.combined.publish_broadcast(&name, broadcast); } } } @@ -268,8 +267,8 @@ impl Cluster { let session = self .client .clone() - .with_publish(self.primary.consumer.consume()) - .with_consume(self.secondary.producer.clone()) + .with_publish(self.primary.consume()) + .with_consume((*self.secondary).clone()) .connect(url.clone()) .await .context("failed to connect to remote")?;