Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rs/hang-cli/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use url::Url;
pub async fn run_client(client: moq_native::Client, url: Url, name: String, publish: Publish) -> anyhow::Result<()> {
// Create an origin producer to publish to the broadcast.
let origin = moq_lite::Origin::produce();
origin.producer.publish_broadcast(&name, publish.consume());
origin.publish_broadcast(&name, publish.consume());

tracing::info!(%url, %name, "connecting");

// Establish the connection, not providing a subscriber.
let session = client.with_publish(origin.consumer).connect(url).await?;
let session = client.with_publish(origin.consume()).connect(url).await?;

#[cfg(unix)]
// Notify systemd that we're ready.
Expand Down
4 changes: 2 additions & 2 deletions rs/hang-cli/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ async fn run_session(
) -> anyhow::Result<()> {
// Create an origin producer to publish to the broadcast.
let origin = moq_lite::Origin::produce();
origin.producer.publish_broadcast(&name, consumer);
origin.publish_broadcast(&name, consumer);

// Blindly accept the session (WebTransport or QUIC), regardless of the URL.
let session = session.with_publish(origin.consumer).accept().await?;
let session = session.with_publish(origin.consume()).accept().await?;

tracing::info!(id, "accepted session");

Expand Down
11 changes: 6 additions & 5 deletions rs/hang/examples/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ async fn main() -> anyhow::Result<()> {

// Create an origin that we can publish to and the session can consume from.
let origin = moq_lite::Origin::produce();
let consumer = origin.consume();

// Run the broadcast production and the session in parallel.
// This is a simple example of how you can concurrently run multiple tasks.
// tokio::spawn works too.
tokio::select! {
res = run_broadcast(origin.producer) => res,
res = run_session(origin.consumer) => res,
res = run_broadcast(origin) => res,
res = run_session(consumer) => res,
}
}

Expand Down Expand Up @@ -93,7 +94,7 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProdu
.produce();

// Publish the catalog track to the broadcast.
broadcast.insert_track(catalog.consumer.track);
broadcast.insert_track(catalog.track.consume());

// Actually create the media track now.
let track = broadcast.create_track(video_track);
Expand All @@ -106,11 +107,11 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProdu
async fn run_broadcast(origin: moq_lite::OriginProducer) -> anyhow::Result<()> {
// Create and publish a broadcast to the origin.
let mut broadcast = moq_lite::Broadcast::produce();
let mut track = create_track(&mut broadcast.producer);
let mut track = create_track(&mut broadcast);

// NOTE: The path is empty because we're using the URL to scope the broadcast.
// OPTIONAL: We publish after inserting the tracks just to avoid a nearly impossible race condition.
origin.publish_broadcast("", broadcast.consumer);
origin.publish_broadcast("", broadcast.consume());

// Not real frames of course.
track.write(hang::Frame {
Expand Down
9 changes: 2 additions & 7 deletions rs/hang/src/catalog/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use serde::{Deserialize, Serialize};

use crate::Result;
use crate::catalog::{Audio, AudioConfig, Chat, User, Video, VideoConfig};
use moq_lite::Produce;

/// A catalog track, created by a broadcaster to describe the tracks available in a broadcast.
#[serde_with::serde_as]
Expand Down Expand Up @@ -83,13 +82,9 @@ impl Catalog {
}

/// Produce a catalog track that describes the available media tracks.
pub fn produce(self) -> Produce<CatalogProducer, CatalogConsumer> {
pub fn produce(self) -> CatalogProducer {
let track = Catalog::default_track().produce();

Produce {
producer: CatalogProducer::new(track.producer, self),
consumer: track.consumer.into(),
}
CatalogProducer::new(track, self)
}

pub fn default_track() -> moq_lite::Track {
Expand Down
6 changes: 3 additions & 3 deletions rs/hang/src/import/aac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ impl Aac {
tracing::debug!(name = ?track.name, ?config, "starting track");

let track = track.produce();
self.broadcast.insert_track(track.consumer);
self.broadcast.insert_track(track.consume());

let mut catalog = self.broadcast.catalog.lock();
let audio = catalog.insert_audio(track.producer.info.name.clone(), config);
let audio = catalog.insert_audio(track.info.name.clone(), config);
audio.priority = 2;

self.track = Some(track.producer);
self.track = Some(track);

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions rs/hang/src/import/avc3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ impl Avc3 {
}

let track = track.produce();
self.broadcast.insert_track(track.consumer);
self.broadcast.insert_track(track.consume());

self.config = Some(config);
self.track = Some(track.producer.into());
self.track = Some(track.into());

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions rs/hang/src/import/hev1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ impl Hev1 {
}

let track = track.produce();
self.broadcast.insert_track(track.consumer);
self.broadcast.insert_track(track.consume());

self.config = Some(config);
self.track = Some(track.producer.into());
self.track = Some(track.into());

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion rs/hang/src/import/hls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ mod tests {

#[test]
fn hls_ingest_starts_without_importers() {
let broadcast = moq_lite::Broadcast::produce().producer.into();
let broadcast = moq_lite::Broadcast::produce().into();
let url = "https://example.com/master.m3u8".to_string();
let cfg = HlsConfig::new(url);
let hls = Hls::new(broadcast, cfg).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions rs/hang/src/import/opus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ impl Opus {
tracing::debug!(name = ?track.name, ?config, "starting track");

let track = track.produce();
self.broadcast.insert_track(track.consumer);
self.broadcast.insert_track(track.consume());

let mut catalog = self.broadcast.catalog.lock();
let audio = catalog.insert_audio(track.producer.info.name.clone(), config);
let audio = catalog.insert_audio(track.info.name.clone(), config);
audio.priority = 2;

self.track = Some(track.producer);
self.track = Some(track);

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions rs/hang/src/model/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ pub struct BroadcastProducer {
impl BroadcastProducer {
pub fn new(mut inner: moq_lite::BroadcastProducer) -> Self {
let catalog = Catalog::default().produce();
inner.insert_track(catalog.consumer.track);
inner.insert_track(catalog.track.consume());

Self {
inner,
catalog: catalog.producer,
catalog,
track_id: Default::default(),
}
}
Expand Down
10 changes: 5 additions & 5 deletions rs/moq-clock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,20 @@ async fn main() -> anyhow::Result<()> {
match config.role {
Command::Publish => {
let mut broadcast = moq_lite::Broadcast::produce();
let track = broadcast.producer.create_track(track);
let track = broadcast.create_track(track);
let clock = clock::Publisher::new(track);

origin.producer.publish_broadcast(&config.broadcast, broadcast.consumer);
origin.publish_broadcast(&config.broadcast, broadcast.consume());

let session = client.with_publish(origin.consumer).connect(config.url).await?;
let session = client.with_publish(origin.consume()).connect(config.url).await?;

tokio::select! {
res = session.closed() => res.context("session closed"),
_ = clock.run() => Ok(()),
}
}
Command::Subscribe => {
let session = client.with_consume(origin.producer).connect(config.url).await?;
let session = client.with_consume(origin.clone()).connect(config.url).await?;

// NOTE: We could just call `session.consume_broadcast(&config.broadcast)` instead,
// However that won't work with IETF MoQ and the current OriginConsumer API the moment.
Expand All @@ -86,7 +86,7 @@ async fn main() -> anyhow::Result<()> {

let path: moq_lite::Path<'_> = config.broadcast.into();
let mut origin = origin
.consumer
.consume()
.consume_only(&[path])
.context("not allowed to consume broadcast")?;

Expand Down
2 changes: 1 addition & 1 deletion rs/moq-lite/src/ietf/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(super) struct Publisher<S: web_transport_trait::Session> {
impl<S: web_transport_trait::Session> Publisher<S> {
pub fn new(session: S, origin: Option<OriginConsumer>, control: Control, version: Version) -> Self {
// Default to a dummy origin that is immediately closed.
let origin = origin.unwrap_or_else(|| Origin::produce().consumer);
let origin = origin.unwrap_or_else(|| Origin::produce().consume());
Self {
session,
origin,
Expand Down
10 changes: 5 additions & 5 deletions rs/moq-lite/src/ietf/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
}
Entry::Vacant(entry) => {
let broadcast = Broadcast::produce();
origin.publish_broadcast(path.clone(), broadcast.consumer);
origin.publish_broadcast(path.clone(), broadcast.consume());
entry.insert(BroadcastState {
producer: broadcast.producer.clone(),
producer: broadcast.clone(),
count: 1,
});
broadcast.producer
broadcast
}
};

Expand Down Expand Up @@ -451,7 +451,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
match state.subscribes.entry(request_id) {
Entry::Vacant(entry) => {
entry.insert(TrackState {
producer: track.producer,
producer: track.clone(),
alias: Some(msg.track_alias),
});
}
Expand All @@ -466,7 +466,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
// NOTE: This is debated in the IETF draft, but is significantly easier to implement.
let mut broadcast = self.start_announce(msg.track_namespace.to_owned())?;

let exists = broadcast.insert_track(track.consumer);
let exists = broadcast.insert_track(track.consume());
if exists {
tracing::warn!(track = %msg.track_name, "track already exists, replacing it");
}
Expand Down
2 changes: 1 addition & 1 deletion rs/moq-lite/src/lite/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub(super) struct Publisher<S: web_transport_trait::Session> {
impl<S: web_transport_trait::Session> Publisher<S> {
pub fn new(session: S, origin: Option<OriginConsumer>, version: Version) -> Self {
// Default to a dummy origin that is immediately closed.
let origin = origin.unwrap_or_else(|| Origin::produce().consumer);
let origin = origin.unwrap_or_else(|| Origin::produce().consume());
Self {
session,
origin,
Expand Down
6 changes: 3 additions & 3 deletions rs/moq-lite/src/lite/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,16 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
// Make sure the peer doesn't double announce.
match producers.entry(path.to_owned()) {
Entry::Occupied(_) => return Err(Error::Duplicate),
Entry::Vacant(entry) => entry.insert(broadcast.producer.clone()),
Entry::Vacant(entry) => entry.insert(broadcast.clone()),
};

// Run the broadcast in the background until all consumers are dropped.
self.origin
.as_mut()
.unwrap()
.publish_broadcast(path.clone(), broadcast.consumer);
.publish_broadcast(path.clone(), broadcast.consume());

web_async::spawn(self.clone().run_broadcast(path, broadcast.producer));
web_async::spawn(self.clone().run_broadcast(path, broadcast));

Ok(())
}
Expand Down
Loading
Loading