From b3e21c48e63f09cd242b5815cda89ee5ab25bae2 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 27 Jan 2026 19:13:01 -0300 Subject: [PATCH 1/5] Add a 30s back-buffer for groups. And a way to fetch them. --- rs/moq-lite/src/model/group.rs | 20 +++++ rs/moq-lite/src/model/track.rs | 107 +++++++++++++++++------ rs/moq-relay/src/auth.rs | 2 +- rs/moq-relay/src/web.rs | 151 ++++++++++++++++++++++++++------- 4 files changed, 224 insertions(+), 56 deletions(-) diff --git a/rs/moq-lite/src/model/group.rs b/rs/moq-lite/src/model/group.rs index 1bd440769..7e935ddeb 100644 --- a/rs/moq-lite/src/model/group.rs +++ b/rs/moq-lite/src/model/group.rs @@ -217,4 +217,24 @@ impl GroupConsumer { } } } + + pub async fn get_frame(&self, index: usize) -> Result> { + let mut state = self.state.clone(); + let Ok(state) = state + .wait_for(|state| index < state.frames.len() || state.closed.is_some()) + .await + else { + return Err(Error::Cancel); + }; + + if let Some(frame) = state.frames.get(index).cloned() { + return Ok(Some(frame)); + } + + match &state.closed { + Some(Ok(_)) => Ok(None), + Some(Err(err)) => Err(err.clone()), + _ => unreachable!(), + } + } } diff --git a/rs/moq-lite/src/model/track.rs b/rs/moq-lite/src/model/track.rs index b90078474..79323635a 100644 --- a/rs/moq-lite/src/model/track.rs +++ b/rs/moq-lite/src/model/track.rs @@ -18,7 +18,9 @@ use crate::{Error, Produce, Result}; use super::{Group, GroupConsumer, GroupProducer}; -use std::{cmp::Ordering, future::Future}; +use std::{collections::VecDeque, future::Future}; + +const MAX_CACHE: std::time::Duration = std::time::Duration::from_secs(30); #[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] @@ -44,8 +46,28 @@ impl Track { #[derive(Default)] struct TrackState { - latest: Option, + groups: VecDeque<(tokio::time::Instant, GroupConsumer)>, closed: Option>, + offset: usize, + + max_sequence: Option, + + // The largest sequence number that has been dropped. + drop_sequence: Option, +} + +impl TrackState { + fn trim(&mut self, now: tokio::time::Instant) { + while let Some((timestamp, _)) = self.groups.front() { + if now.saturating_duration_since(*timestamp) > MAX_CACHE { + let (_, group) = self.groups.pop_front().unwrap(); + self.drop_sequence = Some(self.drop_sequence.unwrap_or(0).max(group.info.sequence)); + self.offset += 1; + } else { + break; + } + } + } } /// A producer for a track, used to create new groups. @@ -67,16 +89,10 @@ impl TrackProducer { pub fn insert_group(&mut self, group: GroupConsumer) -> bool { self.state.send_if_modified(|state| { assert!(state.closed.is_none()); - - if let Some(latest) = &state.latest { - match group.info.cmp(&latest.info) { - Ordering::Less => return false, - Ordering::Equal => return false, - Ordering::Greater => (), - } - } - - state.latest = Some(group.clone()); + let now = tokio::time::Instant::now(); + state.trim(now); + state.groups.push_back((now, group.clone())); + state.max_sequence = Some(state.max_sequence.unwrap_or(0).max(group.info.sequence)); true }) } @@ -96,11 +112,15 @@ impl TrackProducer { self.state.send_if_modified(|state| { assert!(state.closed.is_none()); - let sequence = state.latest.as_ref().map_or(0, |group| group.info.sequence + 1); + let now = tokio::time::Instant::now(); + state.trim(now); + + let sequence = state.max_sequence.map_or(0, |sequence| sequence + 1); let group = Group { sequence }.produce(); - state.latest = Some(group.consumer); - producer = Some(group.producer); + state.groups.push_back((now, group.consumer)); + state.max_sequence = Some(sequence); + producer = Some(group.producer); true }); @@ -127,7 +147,7 @@ impl TrackProducer { TrackConsumer { info: self.info.clone(), state: self.state.subscribe(), - prev: None, + index: 0, } } @@ -156,7 +176,7 @@ impl From for TrackProducer { pub struct TrackConsumer { pub info: Track, state: watch::Receiver, - prev: Option, // The previous sequence number + index: usize, } impl TrackConsumer { @@ -168,24 +188,61 @@ impl TrackConsumer { let Ok(state) = self .state .wait_for(|state| { - state.latest.as_ref().map(|group| group.info.sequence) > self.prev || state.closed.is_some() + let index = self.index.saturating_sub(state.offset); + state.groups.get(index).is_some() || state.closed.is_some() }) .await else { return Err(Error::Cancel); }; + let index = self.index.saturating_sub(state.offset); + if let Some(group) = state.groups.get(index) { + self.index = state.offset + index + 1; + return Ok(Some(group.1.clone())); + } + match &state.closed { - Some(Ok(_)) => return Ok(None), - Some(Err(err)) => return Err(err.clone()), - _ => {} + Some(Ok(_)) => Ok(None), + Some(Err(err)) => Err(err.clone()), + _ => unreachable!(), } + } + + /// Block until the group is available. + /// + /// NOTE: This can block indefinitely if the requested group is dropped. + pub async fn get_group(&self, sequence: u64) -> Result> { + let mut state = self.state.clone(); - // If there's a new latest group, return it. - let group = state.latest.clone().unwrap(); - self.prev = Some(group.info.sequence); + let Ok(state) = state + .wait_for(|state| { + if state.closed.is_some() { + return true; + } - Ok(Some(group)) + if let Some(drop_sequence) = state.drop_sequence + && drop_sequence >= sequence + { + return true; + } + + state.groups.iter().any(|(_, group)| group.info.sequence == sequence) + }) + .await + else { + return Err(Error::Cancel); + }; + + if let Some((_, group)) = state.groups.iter().find(|(_, group)| group.info.sequence == sequence) { + return Ok(Some(group.clone())); + } + + match &state.closed { + Some(Ok(_)) => Ok(None), // end of stream + Some(Err(err)) => Err(err.clone()), + None => Ok(None), // Dropped + } } /// Block until the track is closed. diff --git a/rs/moq-relay/src/auth.rs b/rs/moq-relay/src/auth.rs index b011c97a8..cb4c7f701 100644 --- a/rs/moq-relay/src/auth.rs +++ b/rs/moq-relay/src/auth.rs @@ -68,7 +68,7 @@ pub struct AuthToken { pub cluster: bool, } -const REFRESH_ERROR_INTERVAL: Duration = Duration::from_mins(5); +const REFRESH_ERROR_INTERVAL: Duration = Duration::from_secs(300); #[derive(Clone)] pub struct Auth { diff --git a/rs/moq-relay/src/web.rs b/rs/moq-relay/src/web.rs index 620fa9bbb..81534f97f 100644 --- a/rs/moq-relay/src/web.rs +++ b/rs/moq-relay/src/web.rs @@ -22,18 +22,12 @@ use axum::{ use bytes::Bytes; use clap::Parser; use moq_lite::{OriginConsumer, OriginProducer}; -use serde::{Deserialize, Serialize}; use std::future::Future; use tower_http::cors::{Any, CorsLayer}; use crate::{Auth, Cluster}; -#[derive(Debug, Deserialize)] -struct Params { - jwt: Option, -} - -#[derive(Parser, Clone, Debug, Deserialize, Serialize, Default)] +#[derive(Parser, Clone, Debug, serde::Deserialize, serde::Serialize, Default)] #[serde(deny_unknown_fields, default)] pub struct WebConfig { #[command(flatten)] @@ -166,10 +160,70 @@ async fn serve_fingerprint(State(state): State>) -> String { .clone() } +#[derive(Debug, serde::Deserialize)] +struct AuthParams { + jwt: Option, +} + +#[derive(Debug, serde::Deserialize)] +struct FetchParams { + #[serde(flatten)] + auth: AuthParams, + + #[serde(default)] + group: FetchGroup, + + #[serde(default)] + frame: FetchFrame, +} + +#[derive(Debug, Default)] +enum FetchGroup { + // Return the group at the given sequence number. + Num(u64), + + // Return the latest group. + #[default] + Latest, +} + +impl<'de> serde::Deserialize<'de> for FetchGroup { + fn deserialize>(deserializer: D) -> Result { + let s = String::deserialize(deserializer)?; + if let Ok(num) = s.parse::() { + Ok(FetchGroup::Num(num)) + } else if s == "latest" { + Ok(FetchGroup::Latest) + } else { + Err(serde::de::Error::custom(format!("invalid group value: {s}"))) + } + } +} + +#[derive(Debug, Default)] +enum FetchFrame { + Num(usize), + #[default] + Chunked, +} + +impl<'de> serde::Deserialize<'de> for FetchFrame { + fn deserialize>(deserializer: D) -> Result { + let s = String::deserialize(deserializer)?; + if let Ok(num) = s.parse::() { + Ok(FetchFrame::Num(num)) + } else if s == "chunked" { + Ok(FetchFrame::Chunked) + } else { + Err(serde::de::Error::custom(format!("invalid frame value: {s}"))) + } + } +} + async fn serve_ws( ws: WebSocketUpgrade, Path(path): Path, - Query(params): Query, + Query(params): Query, State(state): State>, ) -> axum::response::Result { let ws = ws.protocols(["webtransport"]); @@ -229,7 +283,7 @@ where /// Serve the announced broadcasts for a given prefix. async fn serve_announced( path: Option>, - Query(params): Query, + Query(params): Query, State(state): State>, ) -> axum::response::Result { let prefix = match path { @@ -253,10 +307,10 @@ async fn serve_announced( Ok(broadcasts.iter().map(|p| p.to_string()).collect::>().join("\n")) } -/// Serve the latest group for a given track +/// Serve the given group for a given track async fn serve_fetch( Path(path): Path, - Query(params): Query, + Query(params): Query, State(state): State>, ) -> axum::response::Result { // The path containts a broadcast/track @@ -269,7 +323,7 @@ async fn serve_fetch( } let broadcast = path.join("/"); - let token = state.auth.verify(&broadcast, params.jwt.as_deref())?; + let token = state.auth.verify(&broadcast, params.auth.jwt.as_deref())?; let Some(origin) = state.cluster.subscriber(&token) else { return Err(StatusCode::UNAUTHORIZED.into()); @@ -286,39 +340,76 @@ async fn serve_fetch( let broadcast = origin.consume_broadcast("").ok_or(StatusCode::NOT_FOUND)?; let mut track = broadcast.subscribe_track(&track); - let Ok(group) = track.next_group().await else { - return Err(StatusCode::INTERNAL_SERVER_ERROR.into()); - }; - let Some(group) = group else { - return Err(StatusCode::NOT_FOUND.into()); - }; + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(30); + + let result = tokio::time::timeout_at(deadline, async { + let group = match params.group { + FetchGroup::Latest => track.next_group().await, + FetchGroup::Num(sequence) => track.get_group(sequence).await, + }; + + let group = match group { + Ok(Some(group)) => group, + Ok(None) => return Err(StatusCode::NOT_FOUND), + Err(_) => return Err(StatusCode::INTERNAL_SERVER_ERROR), + }; + + tracing::info!(track = %track.info.name, group = %group.info.sequence, "serving group"); + + match params.frame { + FetchFrame::Num(index) => match group.get_frame(index).await { + Ok(Some(frame)) => Ok(ServeGroup { + group: None, + frame: Some(frame), + deadline, + }), + Ok(None) => Err(StatusCode::NOT_FOUND), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + }, + FetchFrame::Chunked => Ok(ServeGroup { + group: Some(group), + frame: None, + deadline, + }), + } + }) + .await; - Ok(ServeGroup::new(group)) + match result { + Ok(Ok(serve)) => Ok(serve), + Ok(Err(status)) => Err(status.into()), + Err(_) => Err(StatusCode::GATEWAY_TIMEOUT.into()), + } } struct ServeGroup { - group: moq_lite::GroupConsumer, + group: Option, frame: Option, + deadline: tokio::time::Instant, } impl ServeGroup { - fn new(group: moq_lite::GroupConsumer) -> Self { - Self { group, frame: None } - } - async fn next(&mut self) -> moq_lite::Result> { - loop { + while self.group.is_some() || self.frame.is_some() { if let Some(frame) = self.frame.as_mut() { - let data = frame.read_all().await?; + let data = tokio::time::timeout_at(self.deadline, frame.read_all()) + .await + .map_err(|_| moq_lite::Error::Timeout)?; self.frame.take(); - return Ok(Some(data)); + return Ok(Some(data?)); } - self.frame = self.group.next_frame().await?; - if self.frame.is_none() { - return Ok(None); + if let Some(group) = self.group.as_mut() { + self.frame = tokio::time::timeout_at(self.deadline, group.next_frame()) + .await + .map_err(|_| moq_lite::Error::Timeout)??; + if self.frame.is_none() { + self.group.take(); + } } } + + Ok(None) } } From df2c12af8df1e620024226b7fb404b99dc415aa6 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Tue, 27 Jan 2026 19:25:14 -0300 Subject: [PATCH 2/5] Serve multiple groups in parallel. TODO Add a timeout. --- rs/moq-lite/src/lite/publisher.rs | 68 +++++-------------------------- 1 file changed, 10 insertions(+), 58 deletions(-) diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index d32b3aca8..f271064f7 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use futures::{FutureExt, StreamExt, stream::FuturesUnordered}; use web_async::FuturesExt; use crate::{ @@ -215,49 +216,22 @@ impl Publisher { priority: PriorityQueue, version: Version, ) -> Result<(), Error> { - // TODO use a BTreeMap serve the latest N groups by sequence. - // Until then, we'll implement N=2 manually. - // Also, this is more complicated because we can't use tokio because of WASM. - // We need to drop futures in order to cancel them and keep polling them with select! - let mut old_group = None; - let mut new_group = None; - - // Annoying that we can't use a tuple here as we need the compiler to infer the type. - // Otherwise we'd have to pick Send or !Send... - let mut old_sequence = None; - let mut new_sequence = None; - - // Keep reading groups from the track, some of which may arrive out of order. + let mut tasks = FuturesUnordered::new(); + loop { let group = tokio::select! { biased; Some(group) = track.next_group().transpose() => group, - Some(_) = async { Some(old_group.as_mut()?.await) } => { - old_group = None; - old_sequence = None; - continue; - }, - Some(_) = async { Some(new_group.as_mut()?.await) } => { - new_group = old_group; - new_sequence = old_sequence; - old_group = None; - old_sequence = None; - continue; - }, + // Poll all active group futures; never matches but keeps them running. + true = async { + while tasks.next().await.is_some() {} + false + } => unreachable!(), else => return Ok(()), }?; let sequence = group.info.sequence; - let latest = new_sequence.as_ref().unwrap_or(&0); - - tracing::debug!(subscribe = %subscribe.id, track = %track.info.name, sequence, latest, "serving group"); - - // If this group is older than the oldest group we're serving, skip it. - // We always serve at most two groups, but maybe we should serve only sequence >= MAX-1. - if sequence < *old_sequence.as_ref().unwrap_or(&0) { - tracing::debug!(subscribe = %subscribe.id, track = %track.info.name, old = %sequence, %latest, "skipping group"); - continue; - } + tracing::debug!(subscribe = %subscribe.id, track = %track.info.name, sequence, "serving group"); let msg = lite::Group { subscribe: subscribe.id, @@ -265,29 +239,7 @@ impl Publisher { }; let priority = priority.insert(track.info.priority, sequence); - - // Spawn a task to serve this group, ignoring any errors because they don't really matter. - // TODO add some logging at least. - let handle = Box::pin(Self::serve_group(session.clone(), msg, priority, group, version)); - - // Terminate the old group if it's still running. - if let Some(old_sequence) = old_sequence.take() { - tracing::debug!(subscribe = %subscribe.id, track = %track.info.name, old = %old_sequence, %latest, "aborting group"); - old_group.take(); // Drop the future to cancel it. - } - - assert!(old_group.is_none()); - - if sequence >= *latest { - old_group = new_group; - old_sequence = new_sequence; - - new_group = Some(handle); - new_sequence = Some(sequence); - } else { - old_group = Some(handle); - old_sequence = Some(sequence); - } + tasks.push(Self::serve_group(session.clone(), msg, priority, group, version).map(|_| ())); } } From 1b5e8b34fbacc273f2616e0753153fbcd1b2dddf Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Wed, 28 Jan 2026 07:45:34 -0300 Subject: [PATCH 3/5] Fix some bugs and simplify the API. --- js/hang/src/container/legacy.ts | 3 +- js/hang/src/watch/video/source.ts | 137 +++++----- rs/hang-cli/src/client.rs | 4 +- rs/hang-cli/src/server.rs | 4 +- rs/hang/examples/video.rs | 10 +- rs/hang/src/catalog/root.rs | 9 +- rs/hang/src/import/aac.rs | 7 +- rs/hang/src/import/avc3.rs | 5 +- rs/hang/src/import/fmp4.rs | 10 +- rs/hang/src/import/hev1.rs | 5 +- rs/hang/src/import/hls.rs | 2 +- rs/hang/src/import/opus.rs | 7 +- rs/hang/src/model/broadcast.rs | 4 +- rs/moq-clock/src/main.rs | 9 +- rs/moq-lite/src/ietf/publisher.rs | 4 +- rs/moq-lite/src/ietf/subscriber.rs | 10 +- rs/moq-lite/src/lite/publisher.rs | 4 +- rs/moq-lite/src/lite/subscriber.rs | 6 +- rs/moq-lite/src/model/broadcast.rs | 102 +++----- rs/moq-lite/src/model/frame.rs | 12 +- rs/moq-lite/src/model/group.rs | 16 +- rs/moq-lite/src/model/mod.rs | 2 - rs/moq-lite/src/model/origin.rs | 396 ++++++++++++++--------------- rs/moq-lite/src/model/produce.rs | 10 - rs/moq-lite/src/model/track.rs | 24 +- rs/moq-native/examples/chat.rs | 8 +- rs/moq-relay/src/cluster.rs | 38 ++- 27 files changed, 370 insertions(+), 478 deletions(-) delete mode 100644 rs/moq-lite/src/model/produce.rs diff --git a/js/hang/src/container/legacy.ts b/js/hang/src/container/legacy.ts index d3ab74a1c..f4b4c738e 100644 --- a/js/hang/src/container/legacy.ts +++ b/js/hang/src/container/legacy.ts @@ -209,7 +209,8 @@ export class Consumer { if (this.#active !== undefined && first.consumer.sequence <= this.#active) { this.#groups.shift(); - console.warn(`skipping slow group: ${first.consumer.sequence} < ${this.#groups[0]?.consumer.sequence}`); + // TODO Debug why this fires? + // console.warn(`skipping slow group: ${first.consumer.sequence} < ${this.#groups[0]?.consumer.sequence}`); first.consumer.close(); first.frames.length = 0; diff --git a/js/hang/src/watch/video/source.ts b/js/hang/src/watch/video/source.ts index 8e0b951cc..dce5d71d1 100644 --- a/js/hang/src/watch/video/source.ts +++ b/js/hang/src/watch/video/source.ts @@ -1,5 +1,5 @@ import type * as Moq from "@moq/lite"; -import type { Time } from "@moq/lite"; +import { Time } from "@moq/lite"; import { Effect, type Getter, Signal } from "@moq/signals"; import type * as Catalog from "../../catalog"; import * as Container from "../../container"; @@ -49,9 +49,6 @@ export interface VideoStats { // TODO Maybe we need to detect b-frames and make this dynamic? const MIN_SYNC_WAIT_MS = 200 as Time.Milli; -// The maximum number of concurrent b-frames that we support. -const MAX_BFRAMES = 10; - // Responsible for switching between video tracks and buffering frames. export class Source { broadcast: Signal; @@ -83,6 +80,10 @@ export class Source { // Expose the current frame to render as a signal frame = new Signal(undefined); + // The timestamp of the current frame. + #timestamp = new Signal(undefined); + readonly timestamp: Getter = this.#timestamp; + // Additional buffer in milliseconds (on top of catalog's minBuffer). buffer: Signal; @@ -227,83 +228,62 @@ export class Source { const sub = broadcast.subscribe(name, PRIORITY.video); // TODO use priority from catalog effect.cleanup(() => sub.close()); - // We need a queue because VideoDecoder doesn't block on a Promise returned by output. - // NOTE: We will drain this queue almost immediately, so the highWaterMark is just a safety net. - const queue = new TransformStream( - undefined, - { highWaterMark: MAX_BFRAMES }, - { highWaterMark: MAX_BFRAMES }, - ); - - const writer = queue.writable.getWriter(); - effect.cleanup(() => writer.close()); - - const reader = queue.readable.getReader(); - effect.cleanup(async () => { - // Drain any remaining frames in the queue to prevent memory leaks - try { - let result = await reader.read(); - while (!result.done) { - result.value?.close(); - result = await reader.read(); - } - } catch (error) { - console.error("Error during frame draining:", error); - } finally { - await reader.cancel(); + effect.cleanup(() => { + if (this.#active === effect) { + this.#timestamp.set(undefined); } }); const decoder = new VideoDecoder({ output: async (frame: VideoFrame) => { - // Insert into a queue so we can perform ordered sleeps. - // If this were to block, I believe WritableStream is still ordered. try { - await writer.write(frame); - } catch { - frame.close(); - } - }, - // TODO bubble up error - error: (error) => { - console.error(error); - effect.close(); - }, - }); - effect.cleanup(() => decoder.close()); + const timestamp = Time.Milli.fromMicro(frame.timestamp as Time.Micro); + if (timestamp < (this.#timestamp.peek() ?? 0)) { + // Late frame, don't render it. + return; + } - // Output processing - same for both container types - effect.spawn(async () => { - for (;;) { - const { value: frame } = await reader.read(); - if (!frame) break; - - // Sleep until it's time to decode the next frame. - const ref = performance.now() - frame.timestamp / 1000; - - let sleep = 0; - if (!this.#reference || ref < this.#reference) { - this.#reference = ref; - // Don't sleep so we immediately render this frame. - } else { - sleep = this.#reference - ref + this.#latency.peek(); - } + // Sleep until it's time to decode the next frame. + // NOTE: This function runs in parallel for each frame. + const ref = performance.now() - timestamp; - if (sleep > MIN_SYNC_WAIT_MS) { - this.syncStatus.set({ state: "wait", bufferDuration: sleep }); - } + let sleep = 0; + if (!this.#reference || ref < this.#reference) { + this.#reference = ref; + // Don't sleep so we immediately render this frame. + } else { + sleep = this.#reference - ref + this.#latency.peek(); + } - if (sleep > 0) { - // NOTE: WebCodecs doesn't block on output promises (I think?), so these sleeps will occur concurrently. - // TODO: This cause the `syncStatus` to be racey especially - await new Promise((resolve) => setTimeout(resolve, sleep)); - } + // TODO this syncStatus stuff is racey + if (sleep > MIN_SYNC_WAIT_MS) { + this.syncStatus.set({ state: "wait", bufferDuration: sleep }); + } + + if (sleep > 0) { + // NOTE: WebCodecs doesn't block on output promises (I think?), so these sleeps will occur concurrently. + // TODO: This cause the `syncStatus` to be racey especially + await new Promise((resolve) => setTimeout(resolve, sleep)); + } + + if (timestamp < (this.#timestamp.peek() ?? 0)) { + // Late frame (after sleeping), don't render it. + return; + } - if (sleep > MIN_SYNC_WAIT_MS) { - // Include how long we slept if it was above the threshold. - this.syncStatus.set({ state: "ready", bufferDuration: sleep }); - } else { - this.syncStatus.set({ state: "ready" }); + if (sleep > MIN_SYNC_WAIT_MS) { + // Include how long we slept if it was above the threshold. + this.syncStatus.set({ state: "ready", bufferDuration: sleep }); + } else { + this.syncStatus.set({ state: "ready" }); + } + + this.#timestamp.set(timestamp); + + this.frame.update((prev) => { + prev?.close(); + return frame.clone(); // avoid closing the frame here + }); // If the track switch was pending, complete it now. if (this.#pending === effect) { @@ -312,14 +292,17 @@ export class Source { this.#pending = undefined; effect.set(this.active, name); } + } finally { + frame.close(); } - - this.frame.update((prev) => { - prev?.close(); - return frame; - }); - } + }, + // TODO bubble up error + error: (error) => { + console.error(error); + effect.close(); + }, }); + effect.cleanup(() => decoder.close()); // Input processing - depends on container type if (config.container.kind === "cmaf") { diff --git a/rs/hang-cli/src/client.rs b/rs/hang-cli/src/client.rs index fa14f569e..b815a1e46 100644 --- a/rs/hang-cli/src/client.rs +++ b/rs/hang-cli/src/client.rs @@ -6,12 +6,12 @@ use url::Url; pub async fn run_client(client: moq_native::Client, url: Url, name: String, publish: Publish) -> anyhow::Result<()> { // Create an origin producer to publish to the broadcast. let origin = moq_lite::Origin::produce(); - origin.producer.publish_broadcast(&name, publish.consume()); + origin.publish_broadcast(&name, publish.consume()); tracing::info!(%url, %name, "connecting"); // Establish the connection, not providing a subscriber. - let session = client.with_publish(origin.consumer).connect(url).await?; + let session = client.with_publish(origin.consume()).connect(url).await?; #[cfg(unix)] // Notify systemd that we're ready. diff --git a/rs/hang-cli/src/server.rs b/rs/hang-cli/src/server.rs index 02ce9def7..032459513 100644 --- a/rs/hang-cli/src/server.rs +++ b/rs/hang-cli/src/server.rs @@ -40,10 +40,10 @@ async fn run_session( ) -> anyhow::Result<()> { // Create an origin producer to publish to the broadcast. let origin = moq_lite::Origin::produce(); - origin.producer.publish_broadcast(&name, consumer); + origin.publish_broadcast(&name, consumer); // Blindly accept the session (WebTransport or QUIC), regardless of the URL. - let session = session.with_publish(origin.consumer).accept().await?; + let session = session.with_publish(origin.consume()).accept().await?; tracing::info!(id, "accepted session"); diff --git a/rs/hang/examples/video.rs b/rs/hang/examples/video.rs index 1b169d605..ab26899d3 100644 --- a/rs/hang/examples/video.rs +++ b/rs/hang/examples/video.rs @@ -13,8 +13,8 @@ async fn main() -> anyhow::Result<()> { // This is a simple example of how you can concurrently run multiple tasks. // tokio::spawn works too. tokio::select! { - res = run_broadcast(origin.producer) => res, - res = run_session(origin.consumer) => res, + res = run_session(origin.consume()) => res, + res = run_broadcast(origin) => res, } } @@ -93,7 +93,7 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProdu .produce(); // Publish the catalog track to the broadcast. - broadcast.insert_track(catalog.consumer.track); + broadcast.insert_track(&catalog.track); // Actually create the media track now. let track = broadcast.create_track(video_track); @@ -106,11 +106,11 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProdu async fn run_broadcast(origin: moq_lite::OriginProducer) -> anyhow::Result<()> { // Create and publish a broadcast to the origin. let mut broadcast = moq_lite::Broadcast::produce(); - let mut track = create_track(&mut broadcast.producer); + let mut track = create_track(&mut broadcast); // NOTE: The path is empty because we're using the URL to scope the broadcast. // OPTIONAL: We publish after inserting the tracks just to avoid a nearly impossible race condition. - origin.publish_broadcast("", broadcast.consumer); + origin.publish_broadcast("", broadcast.consume()); // Not real frames of course. track.write(hang::Frame { diff --git a/rs/hang/src/catalog/root.rs b/rs/hang/src/catalog/root.rs index 28eb3c0d1..5c97e4aa7 100644 --- a/rs/hang/src/catalog/root.rs +++ b/rs/hang/src/catalog/root.rs @@ -7,7 +7,6 @@ use serde::{Deserialize, Serialize}; use crate::Result; use crate::catalog::{Audio, AudioConfig, Chat, User, Video, VideoConfig}; -use moq_lite::Produce; /// A catalog track, created by a broadcaster to describe the tracks available in a broadcast. #[serde_with::serde_as] @@ -83,13 +82,9 @@ impl Catalog { } /// Produce a catalog track that describes the available media tracks. - pub fn produce(self) -> Produce { + pub fn produce(self) -> CatalogProducer { let track = Catalog::default_track().produce(); - - Produce { - producer: CatalogProducer::new(track.producer, self), - consumer: track.consumer.into(), - } + CatalogProducer::new(track, self) } pub fn default_track() -> moq_lite::Track { diff --git a/rs/hang/src/import/aac.rs b/rs/hang/src/import/aac.rs index 844d458f3..d8f9a9c3f 100644 --- a/rs/hang/src/import/aac.rs +++ b/rs/hang/src/import/aac.rs @@ -113,14 +113,13 @@ impl Aac { tracing::debug!(name = ?track.name, ?config, "starting track"); - let track = track.produce(); - self.broadcast.insert_track(track.consumer); + let track = self.broadcast.create_track(track); let mut catalog = self.broadcast.catalog.lock(); - let audio = catalog.insert_audio(track.producer.info.name.clone(), config); + let audio = catalog.insert_audio(track.info.name.clone(), config); audio.priority = 2; - self.track = Some(track.producer.into()); + self.track = Some(track.into()); Ok(()) } diff --git a/rs/hang/src/import/avc3.rs b/rs/hang/src/import/avc3.rs index af9171c13..a4bdf6fe4 100644 --- a/rs/hang/src/import/avc3.rs +++ b/rs/hang/src/import/avc3.rs @@ -91,11 +91,10 @@ impl Avc3 { video.priority = 2; } - let track = track.produce(); - self.broadcast.insert_track(track.consumer); + let track = self.broadcast.create_track(track); self.config = Some(config); - self.track = Some(track.producer.into()); + self.track = Some(track.into()); Ok(()) } diff --git a/rs/hang/src/import/fmp4.rs b/rs/hang/src/import/fmp4.rs index 4be9e9f73..b2e58ef37 100644 --- a/rs/hang/src/import/fmp4.rs +++ b/rs/hang/src/import/fmp4.rs @@ -185,9 +185,8 @@ impl Fmp4 { // Record this track name created_video_tracks.push(track.name.clone()); - let track = track.produce(); - self.broadcast.insert_track(track.consumer); - hang::TrackProducer::new(track.producer) + let track = self.broadcast.create_track(track); + hang::TrackProducer::new(track) } b"soun" => { let config = self.init_audio(trak)?; @@ -203,9 +202,8 @@ impl Fmp4 { // Record this track name created_audio_tracks.push(track.name.clone()); - let track = track.produce(); - self.broadcast.insert_track(track.consumer); - hang::TrackProducer::new(track.producer) + let track = self.broadcast.create_track(track); + hang::TrackProducer::new(track) } b"sbtl" => anyhow::bail!("subtitle tracks are not supported"), handler => anyhow::bail!("unknown track type: {:?}", handler), diff --git a/rs/hang/src/import/hev1.rs b/rs/hang/src/import/hev1.rs index 4d203889b..571c3d51b 100644 --- a/rs/hang/src/import/hev1.rs +++ b/rs/hang/src/import/hev1.rs @@ -90,11 +90,10 @@ impl Hev1 { video.priority = 2; } - let track = track.produce(); - self.broadcast.insert_track(track.consumer); + let track = self.broadcast.create_track(track); self.config = Some(config); - self.track = Some(track.producer.into()); + self.track = Some(track.into()); Ok(()) } diff --git a/rs/hang/src/import/hls.rs b/rs/hang/src/import/hls.rs index 284bdf39a..1779ed376 100644 --- a/rs/hang/src/import/hls.rs +++ b/rs/hang/src/import/hls.rs @@ -626,7 +626,7 @@ mod tests { #[test] fn hls_ingest_starts_without_importers() { - let broadcast = moq_lite::Broadcast::produce().producer.into(); + let broadcast = moq_lite::Broadcast::produce().into(); let url = "https://example.com/master.m3u8".to_string(); let cfg = HlsConfig::new(url); let hls = Hls::new(broadcast, cfg).unwrap(); diff --git a/rs/hang/src/import/opus.rs b/rs/hang/src/import/opus.rs index 99583d1ea..a5acf71ad 100644 --- a/rs/hang/src/import/opus.rs +++ b/rs/hang/src/import/opus.rs @@ -59,14 +59,13 @@ impl Opus { tracing::debug!(name = ?track.name, ?config, "starting track"); - let track = track.produce(); - self.broadcast.insert_track(track.consumer); + let track = self.broadcast.create_track(track); let mut catalog = self.broadcast.catalog.lock(); - let audio = catalog.insert_audio(track.producer.info.name.clone(), config); + let audio = catalog.insert_audio(track.info.name.clone(), config); audio.priority = 2; - self.track = Some(track.producer.into()); + self.track = Some(track.into()); Ok(()) } diff --git a/rs/hang/src/model/broadcast.rs b/rs/hang/src/model/broadcast.rs index ca6e671c7..a88e13b62 100644 --- a/rs/hang/src/model/broadcast.rs +++ b/rs/hang/src/model/broadcast.rs @@ -19,11 +19,11 @@ pub struct BroadcastProducer { impl BroadcastProducer { pub fn new(mut inner: moq_lite::BroadcastProducer) -> Self { let catalog = Catalog::default().produce(); - inner.insert_track(catalog.consumer.track); + inner.insert_track(&catalog.track); Self { inner, - catalog: catalog.producer, + catalog, track_id: Default::default(), } } diff --git a/rs/moq-clock/src/main.rs b/rs/moq-clock/src/main.rs index a1c14eb61..4797ae9c6 100644 --- a/rs/moq-clock/src/main.rs +++ b/rs/moq-clock/src/main.rs @@ -63,12 +63,12 @@ async fn main() -> anyhow::Result<()> { match config.role { Command::Publish => { let mut broadcast = moq_lite::Broadcast::produce(); - let track = broadcast.producer.create_track(track); + let track = broadcast.create_track(track); let clock = clock::Publisher::new(track); - origin.producer.publish_broadcast(&config.broadcast, broadcast.consumer); + origin.publish_broadcast(&config.broadcast, broadcast.consume()); - let session = client.with_publish(origin.consumer).connect(config.url).await?; + let session = client.with_publish(origin.consume()).connect(config.url).await?; tokio::select! { res = session.closed() => res.context("session closed"), @@ -76,7 +76,7 @@ async fn main() -> anyhow::Result<()> { } } Command::Subscribe => { - let session = client.with_consume(origin.producer).connect(config.url).await?; + let session = client.with_consume(origin.clone()).connect(config.url).await?; // NOTE: We could just call `session.consume_broadcast(&config.broadcast)` instead, // However that won't work with IETF MoQ and the current OriginConsumer API the moment. @@ -86,7 +86,6 @@ async fn main() -> anyhow::Result<()> { let path: moq_lite::Path<'_> = config.broadcast.into(); let mut origin = origin - .consumer .consume_only(&[path]) .context("not allowed to consume broadcast")?; diff --git a/rs/moq-lite/src/ietf/publisher.rs b/rs/moq-lite/src/ietf/publisher.rs index 9c0224cc9..07cbc924e 100644 --- a/rs/moq-lite/src/ietf/publisher.rs +++ b/rs/moq-lite/src/ietf/publisher.rs @@ -25,8 +25,8 @@ pub(super) struct Publisher { impl Publisher { pub fn new(session: S, origin: Option, control: Control, version: Version) -> Self { - // Default to a dummy origin that is immediately closed. - let origin = origin.unwrap_or_else(|| Origin::produce().consumer); + // Default to a dummy origin that is empty/closed. + let origin = origin.unwrap_or_else(|| Origin::produce().consume()); Self { session, origin, diff --git a/rs/moq-lite/src/ietf/subscriber.rs b/rs/moq-lite/src/ietf/subscriber.rs index b8ace558d..1b7a4548e 100644 --- a/rs/moq-lite/src/ietf/subscriber.rs +++ b/rs/moq-lite/src/ietf/subscriber.rs @@ -88,12 +88,12 @@ impl Subscriber { } Entry::Vacant(entry) => { let broadcast = Broadcast::produce(); - origin.publish_broadcast(path.clone(), broadcast.consumer); + origin.publish_broadcast(path.clone(), broadcast.consume()); entry.insert(BroadcastState { - producer: broadcast.producer.clone(), + producer: broadcast.clone(), count: 1, }); - broadcast.producer + broadcast } }; @@ -451,7 +451,7 @@ impl Subscriber { match state.subscribes.entry(request_id) { Entry::Vacant(entry) => { entry.insert(TrackState { - producer: track.producer, + producer: track.clone(), alias: Some(msg.track_alias), }); } @@ -466,7 +466,7 @@ impl Subscriber { // NOTE: This is debated in the IETF draft, but is significantly easier to implement. let mut broadcast = self.start_announce(msg.track_namespace.to_owned())?; - let exists = broadcast.insert_track(track.consumer); + let exists = broadcast.insert_track(&track); if exists { tracing::warn!(track = %msg.track_name, "track already exists, replacing it"); } diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index f271064f7..923f0f5a9 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -22,8 +22,8 @@ pub(super) struct Publisher { impl Publisher { pub fn new(session: S, origin: Option, version: Version) -> Self { - // Default to a dummy origin that is immediately closed. - let origin = origin.unwrap_or_else(|| Origin::produce().consumer); + // Default to a dummy origin that is empty/closed. + let origin = origin.unwrap_or_else(|| Origin::produce().consume()); Self { session, origin, diff --git a/rs/moq-lite/src/lite/subscriber.rs b/rs/moq-lite/src/lite/subscriber.rs index 9054ca9bd..41bbd3007 100644 --- a/rs/moq-lite/src/lite/subscriber.rs +++ b/rs/moq-lite/src/lite/subscriber.rs @@ -134,16 +134,16 @@ impl Subscriber { // Make sure the peer doesn't double announce. match producers.entry(path.to_owned()) { Entry::Occupied(_) => return Err(Error::Duplicate), - Entry::Vacant(entry) => entry.insert(broadcast.producer.clone()), + Entry::Vacant(entry) => entry.insert(broadcast.clone()), }; // Run the broadcast in the background until all consumers are dropped. self.origin .as_mut() .unwrap() - .publish_broadcast(path.clone(), broadcast.consumer); + .publish_broadcast(path.clone(), broadcast.consume()); - web_async::spawn(self.clone().run_broadcast(path, broadcast.producer)); + web_async::spawn(self.clone().run_broadcast(path, broadcast)); Ok(()) } diff --git a/rs/moq-lite/src/model/broadcast.rs b/rs/moq-lite/src/model/broadcast.rs index c8affe410..8332733ad 100644 --- a/rs/moq-lite/src/model/broadcast.rs +++ b/rs/moq-lite/src/model/broadcast.rs @@ -7,7 +7,7 @@ use std::{ }, }; -use crate::{Error, Produce, TrackConsumer, TrackProducer}; +use crate::{Error, TrackConsumer, TrackProducer}; use tokio::sync::watch; use web_async::Lock; @@ -16,11 +16,11 @@ use super::Track; struct State { // When explicitly publishing, we hold a reference to the consumer. // This prevents the track from being marked as "unused". - published: HashMap, + consumers: HashMap, // When requesting, we hold a reference to the producer for dynamic tracks. // The track will be marked as "unused" when the last consumer is dropped. - requested: HashMap, + producers: HashMap, } /// A collection of media tracks that can be published and subscribed to. @@ -32,10 +32,8 @@ pub struct Broadcast { } impl Broadcast { - pub fn produce() -> Produce { - let producer = BroadcastProducer::new(); - let consumer = producer.consume(); - Produce { producer, consumer } + pub fn produce() -> BroadcastProducer { + BroadcastProducer::new() } } @@ -57,11 +55,11 @@ impl Default for BroadcastProducer { } impl BroadcastProducer { - fn new() -> Self { + pub fn new() -> Self { Self { state: Lock::new(State { - published: HashMap::new(), - requested: HashMap::new(), + consumers: HashMap::new(), + producers: HashMap::new(), }), closed: Default::default(), requested: async_channel::unbounded(), @@ -76,24 +74,24 @@ impl BroadcastProducer { /// Produce a new track and insert it into the broadcast. pub fn create_track(&mut self, track: Track) -> TrackProducer { - let track = track.clone().produce(); - self.insert_track(track.consumer); - track.producer + let track = TrackProducer::new(track); + self.insert_track(&track); + track } /// Insert a track into the lookup, returning true if it was unique. - pub fn insert_track(&mut self, track: TrackConsumer) -> bool { + /// + /// This takes a ref just so it's more clear that you're supposed to keep publishing to the TrackProducer. + pub fn insert_track(&mut self, track: &TrackProducer) -> bool { let mut state = self.state.lock(); - let unique = state.published.insert(track.info.name.clone(), track.clone()).is_none(); - let removed = state.requested.remove(&track.info.name).is_some(); - - unique && !removed + state.consumers.insert(track.info.name.clone(), track.consume()); + state.producers.insert(track.info.name.clone(), track.clone()).is_none() } /// Remove a track from the lookup. pub fn remove_track(&mut self, name: &str) -> bool { let mut state = self.state.lock(); - state.published.remove(name).is_some() || state.requested.remove(name).is_some() + state.consumers.remove(name).is_some() || state.producers.remove(name).is_some() } pub fn consume(&self) -> BroadcastConsumer { @@ -152,8 +150,8 @@ impl Drop for BroadcastProducer { let mut state = self.state.lock(); // Cleanup any published tracks. - state.published.clear(); - state.requested.clear(); + state.consumers.clear(); + state.producers.clear(); } } @@ -194,20 +192,13 @@ impl BroadcastConsumer { pub fn subscribe_track(&self, track: &Track) -> TrackConsumer { let mut state = self.state.lock(); - // Return any explictly published track. - if let Some(consumer) = state.published.get(&track.name).cloned() { - return consumer; - } - - // Return any requested tracks. - if let Some(producer) = state.requested.get(&track.name) { + if let Some(producer) = state.producers.get(&track.name) { return producer.consume(); } // Otherwise we have never seen this track before and need to create a new producer. - let track = track.clone().produce(); - let producer = track.producer; - let consumer = track.consumer; + let producer = track.clone().produce(); + let consumer = producer.consume(); // Insert the producer into the lookup so we will deduplicate requests. // This is not a subscriber so it doesn't count towards "used" subscribers. @@ -222,13 +213,13 @@ impl BroadcastConsumer { } // Insert the producer into the lookup so we will deduplicate requests. - state.requested.insert(producer.info.name.clone(), producer.clone()); + state.producers.insert(producer.info.name.clone(), producer.clone()); // Remove the track from the lookup when it's unused. let state = self.state.clone(); web_async::spawn(async move { producer.unused().await; - state.lock().requested.remove(&producer.info.name); + state.lock().producers.remove(&producer.info.name); }); consumer @@ -265,32 +256,6 @@ impl BroadcastConsumer { mod test { use super::*; - #[tokio::test] - async fn insert() { - let mut producer = BroadcastProducer::new(); - let mut track1 = Track::new("track1").produce(); - - // Make sure we can insert before a consumer is created. - producer.insert_track(track1.consumer); - track1.producer.append_group(); - - let consumer = producer.consume(); - - let mut track1_sub = consumer.subscribe_track(&track1.producer.info); - track1_sub.assert_group(); - - let mut track2 = Track::new("track2").produce(); - producer.insert_track(track2.consumer); - - let consumer2 = producer.consume(); - let mut track2_consumer = consumer2.subscribe_track(&track2.producer.info); - track2_consumer.assert_no_group(); - - track2.producer.append_group(); - - track2_consumer.assert_group(); - } - #[tokio::test] async fn unused() { let producer = BroadcastProducer::new(); @@ -333,11 +298,10 @@ mod test { consumer.assert_not_closed(); // Create a new track and insert it into the broadcast. - let mut track1 = Track::new("track1").produce(); - track1.producer.append_group(); - producer.insert_track(track1.consumer); + let mut track1 = producer.create_track(Track::new("track1")); + track1.append_group(); - let mut track1c = consumer.subscribe_track(&track1.producer.info); + let mut track1c = consumer.subscribe_track(&track1.info); let track2 = consumer.subscribe_track(&Track::new("track2")); drop(producer); @@ -352,7 +316,7 @@ mod test { track1c.assert_not_closed(); // TODO: We should probably cascade the closed state. - drop(track1.producer); + drop(track1); track1c.assert_closed(); } @@ -410,10 +374,10 @@ mod test { let mut broadcast = Broadcast::produce(); // Subscribe to a track that doesn't exist - this creates a request - let consumer1 = broadcast.consumer.subscribe_track(&Track::new("unknown_track")); + let consumer1 = broadcast.consume().subscribe_track(&Track::new("unknown_track")); // Get the requested track producer - let producer1 = broadcast.producer.assert_request(); + let producer1 = broadcast.assert_request(); // The track producer should NOT be unused yet because there's a consumer assert!( @@ -422,7 +386,7 @@ mod test { ); // Making a new consumer will keep the producer alive - let consumer2 = broadcast.consumer.subscribe_track(&Track::new("unknown_track")); + let consumer2 = broadcast.consume().subscribe_track(&Track::new("unknown_track")); consumer2.assert_is_clone(&consumer1); // Drop the consumer subscription @@ -449,8 +413,8 @@ mod test { tokio::time::sleep(std::time::Duration::from_millis(1)).await; // Now the cleanup task should have run and we can subscribe again to the unknown track. - let consumer3 = broadcast.consumer.subscribe_track(&Track::new("unknown_track")); - let producer2 = broadcast.producer.assert_request(); + let consumer3 = broadcast.consume().subscribe_track(&Track::new("unknown_track")); + let producer2 = broadcast.assert_request(); // Drop the consumer, now the producer should be unused drop(consumer3); diff --git a/rs/moq-lite/src/model/frame.rs b/rs/moq-lite/src/model/frame.rs index 23c2b29f9..7d0cf8835 100644 --- a/rs/moq-lite/src/model/frame.rs +++ b/rs/moq-lite/src/model/frame.rs @@ -3,7 +3,7 @@ use std::future::Future; use bytes::{Bytes, BytesMut}; use tokio::sync::watch; -use crate::{Error, Produce, Result}; +use crate::{Error, Result}; /// A chunk of data with an upfront size. #[derive(Clone, Debug)] @@ -13,11 +13,9 @@ pub struct Frame { } impl Frame { - /// Create a new producer and consumer for the frame. - pub fn produce(self) -> Produce { - let producer = FrameProducer::new(self); - let consumer = producer.consume(); - Produce { producer, consumer } + /// Create a new producer for the frame. + pub fn produce(self) -> FrameProducer { + FrameProducer::new(self) } } @@ -68,7 +66,7 @@ pub struct FrameProducer { } impl FrameProducer { - fn new(info: Frame) -> Self { + pub fn new(info: Frame) -> Self { Self { info, state: Default::default(), diff --git a/rs/moq-lite/src/model/group.rs b/rs/moq-lite/src/model/group.rs index 7e935ddeb..de3d1b0a8 100644 --- a/rs/moq-lite/src/model/group.rs +++ b/rs/moq-lite/src/model/group.rs @@ -12,7 +12,7 @@ use std::future::Future; use bytes::Bytes; use tokio::sync::watch; -use crate::{Error, Produce, Result}; +use crate::{Error, Result}; use super::{Frame, FrameConsumer, FrameProducer}; @@ -26,10 +26,8 @@ pub struct Group { } impl Group { - pub fn produce(self) -> Produce { - let producer = GroupProducer::new(self); - let consumer = producer.consume(); - Produce { producer, consumer } + pub fn producer(self) -> GroupProducer { + GroupProducer::new(self) } } @@ -83,7 +81,7 @@ pub struct GroupProducer { } impl GroupProducer { - fn new(info: Group) -> Self { + pub fn new(info: Group) -> Self { Self { info, state: Default::default(), @@ -106,9 +104,9 @@ impl GroupProducer { /// Create a frame with an upfront size pub fn create_frame(&mut self, info: Frame) -> FrameProducer { - let frame = Frame::produce(info); - self.append_frame(frame.consumer); - frame.producer + let frame = info.produce(); + self.append_frame(frame.consume()); + frame } /// Append a frame to the group. diff --git a/rs/moq-lite/src/model/mod.rs b/rs/moq-lite/src/model/mod.rs index c3bd0f5fc..460a300df 100644 --- a/rs/moq-lite/src/model/mod.rs +++ b/rs/moq-lite/src/model/mod.rs @@ -2,7 +2,6 @@ mod broadcast; mod frame; mod group; mod origin; -mod produce; mod time; mod track; @@ -10,6 +9,5 @@ pub use broadcast::*; pub use frame::*; pub use group::*; pub use origin::*; -pub use produce::*; pub use time::*; pub use track::*; diff --git a/rs/moq-lite/src/model/origin.rs b/rs/moq-lite/src/model/origin.rs index f0dff3613..c8df8a2b4 100644 --- a/rs/moq-lite/src/model/origin.rs +++ b/rs/moq-lite/src/model/origin.rs @@ -6,7 +6,7 @@ use tokio::sync::mpsc; use web_async::Lock; use super::BroadcastConsumer; -use crate::{AsPath, Broadcast, BroadcastProducer, Path, PathOwned, Produce}; +use crate::{AsPath, Broadcast, BroadcastProducer, Path, PathOwned}; static NEXT_CONSUMER_ID: AtomicU64 = AtomicU64::new(0); @@ -334,10 +334,8 @@ pub type OriginAnnounce = (PathOwned, Option); pub struct Origin {} impl Origin { - pub fn produce() -> Produce { - let producer = OriginProducer::default(); - let consumer = producer.consume(); - Produce { producer, consumer } + pub fn produce() -> OriginProducer { + OriginProducer::new() } } @@ -353,14 +351,17 @@ pub struct OriginProducer { } impl OriginProducer { + pub fn new() -> Self { + Self::default() + } + /// Create and publish a new broadcast, returning the producer. /// /// This is a helper method when you only want to publish a broadcast to a single origin. /// Returns [None] if the broadcast is not allowed to be published. pub fn create_broadcast(&self, path: impl AsPath) -> Option { let broadcast = Broadcast::produce(); - self.publish_broadcast(path, broadcast.consumer) - .then_some(broadcast.producer) + self.publish_broadcast(path, broadcast.consume()).then_some(broadcast) } /// Publish a broadcast, announcing it to all consumers. @@ -416,6 +417,16 @@ impl OriginProducer { Some(OriginConsumer::new(self.root.clone(), self.nodes.select(prefixes)?)) } + /// Subscribe to a specific broadcast. + /// + /// Returns None if the broadcast is not found. + pub fn consume_broadcast(&self, path: impl AsPath) -> Option { + let path = path.as_path(); + let (root, rest) = self.nodes.get(&path)?; + let state = root.lock(); + state.consume_broadcast(&rest) + } + /// Returns a new OriginProducer that automatically strips out the provided prefix. /// /// Returns None if the provided root is not authorized; when publish_only was already used without a wildcard. @@ -611,32 +622,32 @@ mod tests { let broadcast1 = Broadcast::produce(); let broadcast2 = Broadcast::produce(); - let mut consumer1 = origin.consumer; + let mut consumer1 = origin.consume(); // Make a new consumer that should get it. consumer1.assert_next_wait(); // Publish the first broadcast. - origin.producer.publish_broadcast("test1", broadcast1.consumer); + origin.publish_broadcast("test1", broadcast1.consume()); - consumer1.assert_next("test1", &broadcast1.producer.consume()); + consumer1.assert_next("test1", &broadcast1.consume()); consumer1.assert_next_wait(); // Make a new consumer that should get the existing broadcast. // But we don't consume it yet. - let mut consumer2 = origin.producer.consume(); + let mut consumer2 = origin.consume(); // Publish the second broadcast. - origin.producer.publish_broadcast("test2", broadcast2.consumer); + origin.publish_broadcast("test2", broadcast2.consume()); - consumer1.assert_next("test2", &broadcast2.producer.consume()); + consumer1.assert_next("test2", &broadcast2.consume()); consumer1.assert_next_wait(); - consumer2.assert_next("test1", &broadcast1.producer.consume()); - consumer2.assert_next("test2", &broadcast2.producer.consume()); + consumer2.assert_next("test1", &broadcast1.consume()); + consumer2.assert_next("test2", &broadcast2.consume()); consumer2.assert_next_wait(); // Close the first broadcast. - drop(broadcast1.producer); + drop(broadcast1); // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; @@ -648,12 +659,12 @@ mod tests { consumer2.assert_next_wait(); // And a new consumer only gets the last broadcast. - let mut consumer3 = origin.producer.consume(); - consumer3.assert_next("test2", &broadcast2.producer.consume()); + let mut consumer3 = origin.consume(); + consumer3.assert_next("test2", &broadcast2.consume()); consumer3.assert_next_wait(); // Close the other producer and make sure it cleans up - drop(broadcast2.producer); + drop(broadcast2); // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; @@ -671,56 +682,57 @@ mod tests { #[tokio::test] async fn test_duplicate() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); let broadcast1 = Broadcast::produce(); let broadcast2 = Broadcast::produce(); let broadcast3 = Broadcast::produce(); - let consumer1 = broadcast1.consumer; - let consumer2 = broadcast2.consumer; - let consumer3 = broadcast3.consumer; + let consumer1 = broadcast1.consume(); + let consumer2 = broadcast2.consume(); + let consumer3 = broadcast3.consume(); - origin.producer.publish_broadcast("test", consumer1.clone()); - origin.producer.publish_broadcast("test", consumer2.clone()); - origin.producer.publish_broadcast("test", consumer3.clone()); + origin.publish_broadcast("test", consumer1.clone()); + origin.publish_broadcast("test", consumer2.clone()); + origin.publish_broadcast("test", consumer3.clone()); - assert!(origin.consumer.consume_broadcast("test").is_some()); + let mut consumer = origin.consume(); + assert!(consumer.consume_broadcast("test").is_some()); - origin.consumer.assert_next("test", &consumer1); - origin.consumer.assert_next_none("test"); - origin.consumer.assert_next("test", &consumer2); - origin.consumer.assert_next_none("test"); - origin.consumer.assert_next("test", &consumer3); + consumer.assert_next("test", &consumer1); + consumer.assert_next_none("test"); + consumer.assert_next("test", &consumer2); + consumer.assert_next_none("test"); + consumer.assert_next("test", &consumer3); // Drop the backup, nothing should change. - drop(broadcast2.producer); + drop(broadcast2); // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.consumer.consume_broadcast("test").is_some()); - origin.consumer.assert_next_wait(); + assert!(consumer.consume_broadcast("test").is_some()); + consumer.assert_next_wait(); // Drop the active, we should reannounce. - drop(broadcast3.producer); + drop(broadcast3); // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.consumer.consume_broadcast("test").is_some()); - origin.consumer.assert_next_none("test"); - origin.consumer.assert_next("test", &consumer1); + assert!(consumer.consume_broadcast("test").is_some()); + consumer.assert_next_none("test"); + consumer.assert_next("test", &consumer1); // Drop the final broadcast, we should unannounce. - drop(broadcast1.producer); + drop(broadcast1); // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.consumer.consume_broadcast("test").is_none()); + assert!(consumer.consume_broadcast("test").is_none()); - origin.consumer.assert_next_none("test"); - origin.consumer.assert_next_wait(); + consumer.assert_next_none("test"); + consumer.assert_next_wait(); } #[tokio::test] @@ -729,22 +741,22 @@ mod tests { let broadcast1 = Broadcast::produce(); let broadcast2 = Broadcast::produce(); - origin.producer.publish_broadcast("test", broadcast1.consumer); - origin.producer.publish_broadcast("test", broadcast2.consumer); - assert!(origin.consumer.consume_broadcast("test").is_some()); + origin.publish_broadcast("test", broadcast1.consume()); + origin.publish_broadcast("test", broadcast2.consume()); + assert!(origin.consume_broadcast("test").is_some()); // This is harder, dropping the new broadcast first. - drop(broadcast2.producer); + drop(broadcast2); // Wait for the cleanup async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.consumer.consume_broadcast("test").is_some()); + assert!(origin.consume_broadcast("test").is_some()); - drop(broadcast1.producer); + drop(broadcast1); // Wait for the cleanup async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.consumer.consume_broadcast("test").is_none()); + assert!(origin.consume_broadcast("test").is_none()); } #[tokio::test] @@ -753,91 +765,91 @@ mod tests { let broadcast = Broadcast::produce(); // Ensure it doesn't crash. - origin.producer.publish_broadcast("test", broadcast.producer.consume()); - origin.producer.publish_broadcast("test", broadcast.producer.consume()); + origin.publish_broadcast("test", broadcast.consume()); + origin.publish_broadcast("test", broadcast.consume()); - assert!(origin.consumer.consume_broadcast("test").is_some()); + assert!(origin.consume_broadcast("test").is_some()); - drop(broadcast.producer); + drop(broadcast); // Wait for the async task to run. tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; - assert!(origin.consumer.consume_broadcast("test").is_none()); + assert!(origin.consume_broadcast("test").is_none()); } // There was a tokio bug where only the first 127 broadcasts would be received instantly. #[tokio::test] #[should_panic] async fn test_128() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); let broadcast = Broadcast::produce(); for i in 0..256 { - origin - .producer - .publish_broadcast(format!("test{i}"), broadcast.consumer.clone()); + origin.publish_broadcast(format!("test{i}"), broadcast.consume()); } + let mut consumer = origin.consume(); for i in 0..256 { - origin.consumer.assert_next(format!("test{i}"), &broadcast.consumer); + consumer.assert_next(format!("test{i}"), &broadcast.consume()); } } #[tokio::test] async fn test_128_fix() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); let broadcast = Broadcast::produce(); for i in 0..256 { - origin - .producer - .publish_broadcast(format!("test{i}"), broadcast.consumer.clone()); + origin.publish_broadcast(format!("test{i}"), broadcast.consume()); } + let mut consumer = origin.consume(); for i in 0..256 { // try_next does not have the same issue because it's synchronous. - origin.consumer.assert_try_next(format!("test{i}"), &broadcast.consumer); + consumer.assert_try_next(format!("test{i}"), &broadcast.consume()); } } #[tokio::test] async fn test_with_root_basic() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); let broadcast = Broadcast::produce(); // Create a producer with root "/foo" - let foo_producer = origin.producer.with_root("foo").expect("should create root"); + let foo_producer = origin.with_root("foo").expect("should create root"); assert_eq!(foo_producer.root().as_str(), "foo"); // When publishing to "bar/baz", it should actually publish to "foo/bar/baz" - assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consumer.clone())); + assert!(foo_producer.publish_broadcast("bar/baz", broadcast.consume())); + let mut consumer = origin.consume(); // The original consumer should see the full path - origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer); + consumer.assert_next("foo/bar/baz", &broadcast.consume()); // A consumer created from the rooted producer should see the stripped path let mut foo_consumer = foo_producer.consume(); - foo_consumer.assert_next("bar/baz", &broadcast.consumer); + foo_consumer.assert_next("bar/baz", &broadcast.consume()); } #[tokio::test] async fn test_with_root_nested() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); let broadcast = Broadcast::produce(); // Create nested roots - let foo_producer = origin.producer.with_root("foo").expect("should create foo root"); + let foo_producer = origin.with_root("foo").expect("should create foo root"); let foo_bar_producer = foo_producer.with_root("bar").expect("should create bar root"); assert_eq!(foo_bar_producer.root().as_str(), "foo/bar"); // Publishing to "baz" should actually publish to "foo/bar/baz" - assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consumer.clone())); + assert!(foo_bar_producer.publish_broadcast("baz", broadcast.consume())); + let mut consumer = origin.consume(); // The original consumer sees the full path - origin.consumer.assert_next("foo/bar/baz", &broadcast.consumer); + consumer.assert_next("foo/bar/baz", &broadcast.consume()); // Consumer from foo_bar_producer sees just "baz" let mut foo_bar_consumer = foo_bar_producer.consume(); - foo_bar_consumer.assert_next("baz", &broadcast.consumer); + foo_bar_consumer.assert_next("baz", &broadcast.consume()); } #[tokio::test] @@ -847,19 +859,18 @@ mod tests { // Create a producer that can only publish to "allowed" paths let limited_producer = origin - .producer .publish_only(&["allowed/path1".into(), "allowed/path2".into()]) .expect("should create limited producer"); // Should be able to publish to allowed paths - assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consumer.clone())); + assert!(limited_producer.publish_broadcast("allowed/path1", broadcast.consume())); + assert!(limited_producer.publish_broadcast("allowed/path1/nested", broadcast.consume())); + assert!(limited_producer.publish_broadcast("allowed/path2", broadcast.consume())); // Should not be able to publish to disallowed paths - assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consumer.clone())); - assert!(!limited_producer.publish_broadcast("allowed", broadcast.consumer.clone())); // Parent of allowed path - assert!(!limited_producer.publish_broadcast("other/path", broadcast.consumer.clone())); + assert!(!limited_producer.publish_broadcast("notallowed", broadcast.consume())); + assert!(!limited_producer.publish_broadcast("allowed", broadcast.consume())); // Parent of allowed path + assert!(!limited_producer.publish_broadcast("other/path", broadcast.consume())); } #[tokio::test] @@ -867,42 +878,36 @@ mod tests { let origin = Origin::produce(); // Creating a producer with no allowed paths should return None - assert!(origin.producer.publish_only(&[]).is_none()); + assert!(origin.publish_only(&[]).is_none()); } #[tokio::test] async fn test_consume_only_filters() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); let broadcast1 = Broadcast::produce(); let broadcast2 = Broadcast::produce(); let broadcast3 = Broadcast::produce(); // Publish to different paths - origin - .producer - .publish_broadcast("allowed", broadcast1.consumer.clone()); - origin - .producer - .publish_broadcast("allowed/nested", broadcast2.consumer.clone()); - origin - .producer - .publish_broadcast("notallowed", broadcast3.consumer.clone()); + origin.publish_broadcast("allowed", broadcast1.consume()); + origin.publish_broadcast("allowed/nested", broadcast2.consume()); + origin.publish_broadcast("notallowed", broadcast3.consume()); // Create a consumer that only sees "allowed" paths let mut limited_consumer = origin - .consumer .consume_only(&["allowed".into()]) .expect("should create limited consumer"); // Should only receive broadcasts under "allowed" - limited_consumer.assert_next("allowed", &broadcast1.consumer); - limited_consumer.assert_next("allowed/nested", &broadcast2.consumer); + limited_consumer.assert_next("allowed", &broadcast1.consume()); + limited_consumer.assert_next("allowed/nested", &broadcast2.consume()); limited_consumer.assert_next_wait(); // Should not see "notallowed" - // Original consumer should see all - origin.consumer.assert_next("allowed", &broadcast1.consumer); - origin.consumer.assert_next("allowed/nested", &broadcast2.consumer); - origin.consumer.assert_next("notallowed", &broadcast3.consumer); + // Unscoped consumer should see all + let mut consumer = origin.consume(); + consumer.assert_next("allowed", &broadcast1.consume()); + consumer.assert_next("allowed/nested", &broadcast2.consume()); + consumer.assert_next("notallowed", &broadcast3.consume()); } #[tokio::test] @@ -912,34 +917,27 @@ mod tests { let broadcast2 = Broadcast::produce(); let broadcast3 = Broadcast::produce(); - origin - .producer - .publish_broadcast("foo/test", broadcast1.consumer.clone()); - origin - .producer - .publish_broadcast("bar/test", broadcast2.consumer.clone()); - origin - .producer - .publish_broadcast("baz/test", broadcast3.consumer.clone()); + origin.publish_broadcast("foo/test", broadcast1.consume()); + origin.publish_broadcast("bar/test", broadcast2.consume()); + origin.publish_broadcast("baz/test", broadcast3.consume()); // Consumer that only sees "foo" and "bar" paths let mut limited_consumer = origin - .consumer .consume_only(&["foo".into(), "bar".into()]) .expect("should create limited consumer"); - limited_consumer.assert_next("foo/test", &broadcast1.consumer); - limited_consumer.assert_next("bar/test", &broadcast2.consumer); + limited_consumer.assert_next("foo/test", &broadcast1.consume()); + limited_consumer.assert_next("bar/test", &broadcast2.consume()); limited_consumer.assert_next_wait(); // Should not see "baz/test" } #[tokio::test] async fn test_with_root_and_publish_only() { - let mut origin = Origin::produce(); + let origin = Origin::produce(); let broadcast = Broadcast::produce(); // User connects to /foo root - let foo_producer = origin.producer.with_root("foo").expect("should create foo root"); + let foo_producer = origin.with_root("foo").expect("should create foo root"); // Limit them to publish only to "bar" and "goop/pee" within /foo let limited_producer = foo_producer @@ -947,21 +945,22 @@ mod tests { .expect("should create limited producer"); // Should be able to publish to foo/bar and foo/goop/pee (but user sees as bar and goop/pee) - assert!(limited_producer.publish_broadcast("bar", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consumer.clone())); + assert!(limited_producer.publish_broadcast("bar", broadcast.consume())); + assert!(limited_producer.publish_broadcast("bar/nested", broadcast.consume())); + assert!(limited_producer.publish_broadcast("goop/pee", broadcast.consume())); + assert!(limited_producer.publish_broadcast("goop/pee/nested", broadcast.consume())); // Should not be able to publish outside allowed paths - assert!(!limited_producer.publish_broadcast("baz", broadcast.consumer.clone())); - assert!(!limited_producer.publish_broadcast("goop", broadcast.consumer.clone())); // Parent of allowed - assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consumer.clone())); + assert!(!limited_producer.publish_broadcast("baz", broadcast.consume())); + assert!(!limited_producer.publish_broadcast("goop", broadcast.consume())); // Parent of allowed + assert!(!limited_producer.publish_broadcast("goop/other", broadcast.consume())); // Original consumer sees full paths - origin.consumer.assert_next("foo/bar", &broadcast.consumer); - origin.consumer.assert_next("foo/bar/nested", &broadcast.consumer); - origin.consumer.assert_next("foo/goop/pee", &broadcast.consumer); - origin.consumer.assert_next("foo/goop/pee/nested", &broadcast.consumer); + let mut consumer = origin.consume(); + consumer.assert_next("foo/bar", &broadcast.consume()); + consumer.assert_next("foo/bar/nested", &broadcast.consume()); + consumer.assert_next("foo/goop/pee", &broadcast.consume()); + consumer.assert_next("foo/goop/pee/nested", &broadcast.consume()); } #[tokio::test] @@ -972,18 +971,12 @@ mod tests { let broadcast3 = Broadcast::produce(); // Publish broadcasts - origin - .producer - .publish_broadcast("foo/bar/test", broadcast1.consumer.clone()); - origin - .producer - .publish_broadcast("foo/goop/pee/test", broadcast2.consumer.clone()); - origin - .producer - .publish_broadcast("foo/other/test", broadcast3.consumer.clone()); + origin.publish_broadcast("foo/bar/test", broadcast1.consume()); + origin.publish_broadcast("foo/goop/pee/test", broadcast2.consume()); + origin.publish_broadcast("foo/other/test", broadcast3.consume()); // User connects to /foo root - let foo_producer = origin.producer.with_root("foo").expect("should create foo root"); + let foo_producer = origin.with_root("foo").expect("should create foo root"); // Create consumer limited to "bar" and "goop/pee" within /foo let mut limited_consumer = foo_producer @@ -991,8 +984,8 @@ mod tests { .expect("should create limited consumer"); // Should only see allowed paths (without foo prefix) - limited_consumer.assert_next("bar/test", &broadcast1.consumer); - limited_consumer.assert_next("goop/pee/test", &broadcast2.consumer); + limited_consumer.assert_next("bar/test", &broadcast1.consume()); + limited_consumer.assert_next("goop/pee/test", &broadcast2.consume()); limited_consumer.assert_next_wait(); // Should not see "other/test" } @@ -1002,7 +995,6 @@ mod tests { // First limit the producer to specific paths let limited_producer = origin - .producer .publish_only(&["allowed".into()]) .expect("should create limited producer"); @@ -1022,11 +1014,11 @@ mod tests { let broadcast = Broadcast::produce(); // Producer with root access (empty string means wildcard) - let root_producer = origin.producer.clone(); + let root_producer = origin.clone(); // Should be able to publish anywhere - assert!(root_producer.publish_broadcast("any/path", broadcast.consumer.clone())); - assert!(root_producer.publish_broadcast("other/path", broadcast.consumer.clone())); + assert!(root_producer.publish_broadcast("any/path", broadcast.consume())); + assert!(root_producer.publish_broadcast("other/path", broadcast.consume())); // Can create any root let foo_producer = root_producer.with_root("foo").expect("should create any root"); @@ -1039,30 +1031,26 @@ mod tests { let broadcast1 = Broadcast::produce(); let broadcast2 = Broadcast::produce(); - origin - .producer - .publish_broadcast("allowed/test", broadcast1.consumer.clone()); - origin - .producer - .publish_broadcast("notallowed/test", broadcast2.consumer.clone()); + origin.publish_broadcast("allowed/test", broadcast1.consume()); + origin.publish_broadcast("notallowed/test", broadcast2.consume()); // Create limited consumer let limited_consumer = origin - .consumer .consume_only(&["allowed".into()]) .expect("should create limited consumer"); // Should be able to get allowed broadcast let result = limited_consumer.consume_broadcast("allowed/test"); assert!(result.is_some()); - assert!(result.unwrap().is_clone(&broadcast1.consumer)); + assert!(result.unwrap().is_clone(&broadcast1.consume())); // Should not be able to get disallowed broadcast assert!(limited_consumer.consume_broadcast("notallowed/test").is_none()); // Original consumer can get both - assert!(origin.consumer.consume_broadcast("allowed/test").is_some()); - assert!(origin.consumer.consume_broadcast("notallowed/test").is_some()); + let consumer = origin.consume(); + assert!(consumer.consume_broadcast("allowed/test").is_some()); + assert!(consumer.consume_broadcast("notallowed/test").is_some()); } #[tokio::test] @@ -1072,19 +1060,18 @@ mod tests { // Create producer limited to "a/b/c" let limited_producer = origin - .producer .publish_only(&["a/b/c".into()]) .expect("should create limited producer"); // Should be able to publish to exact path and nested paths - assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consumer.clone())); - assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consumer.clone())); + assert!(limited_producer.publish_broadcast("a/b/c", broadcast.consume())); + assert!(limited_producer.publish_broadcast("a/b/c/d", broadcast.consume())); + assert!(limited_producer.publish_broadcast("a/b/c/d/e", broadcast.consume())); // Should not be able to publish to parent or sibling paths - assert!(!limited_producer.publish_broadcast("a", broadcast.consumer.clone())); - assert!(!limited_producer.publish_broadcast("a/b", broadcast.consumer.clone())); - assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consumer.clone())); + assert!(!limited_producer.publish_broadcast("a", broadcast.consume())); + assert!(!limited_producer.publish_broadcast("a/b", broadcast.consume())); + assert!(!limited_producer.publish_broadcast("a/b/other", broadcast.consume())); } #[tokio::test] @@ -1095,41 +1082,32 @@ mod tests { let broadcast3 = Broadcast::produce(); // Publish to different paths - origin - .producer - .publish_broadcast("foo/test", broadcast1.consumer.clone()); - origin - .producer - .publish_broadcast("bar/test", broadcast2.consumer.clone()); - origin - .producer - .publish_broadcast("baz/test", broadcast3.consumer.clone()); + origin.publish_broadcast("foo/test", broadcast1.consume()); + origin.publish_broadcast("bar/test", broadcast2.consume()); + origin.publish_broadcast("baz/test", broadcast3.consume()); // Create consumers with different permissions let mut foo_consumer = origin - .consumer .consume_only(&["foo".into()]) .expect("should create foo consumer"); let mut bar_consumer = origin - .consumer .consume_only(&["bar".into()]) .expect("should create bar consumer"); let mut foobar_consumer = origin - .consumer .consume_only(&["foo".into(), "bar".into()]) .expect("should create foobar consumer"); // Each consumer should only see their allowed paths - foo_consumer.assert_next("foo/test", &broadcast1.consumer); + foo_consumer.assert_next("foo/test", &broadcast1.consume()); foo_consumer.assert_next_wait(); - bar_consumer.assert_next("bar/test", &broadcast2.consumer); + bar_consumer.assert_next("bar/test", &broadcast2.consume()); bar_consumer.assert_next_wait(); - foobar_consumer.assert_next("foo/test", &broadcast1.consumer); - foobar_consumer.assert_next("bar/test", &broadcast2.consumer); + foobar_consumer.assert_next("foo/test", &broadcast1.consume()); + foobar_consumer.assert_next("bar/test", &broadcast2.consume()); foobar_consumer.assert_next_wait(); } @@ -1140,14 +1118,14 @@ mod tests { let broadcast2 = Broadcast::produce(); // User with root "demo" allowed to subscribe to "worm-node" and "foobar" - let demo_producer = origin.producer.with_root("demo").expect("should create demo root"); + let demo_producer = origin.with_root("demo").expect("should create demo root"); let limited_producer = demo_producer .publish_only(&["worm-node".into(), "foobar".into()]) .expect("should create limited producer"); // Publish some broadcasts - assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consumer.clone())); - assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consumer.clone())); + assert!(limited_producer.publish_broadcast("worm-node/test", broadcast1.consume())); + assert!(limited_producer.publish_broadcast("foobar/test", broadcast2.consume())); // consume_only with empty prefix should keep the exact same "worm-node" and "foobar" nodes let mut consumer = limited_producer @@ -1155,8 +1133,8 @@ mod tests { .expect("should create consumer with empty prefix"); // Should still see both broadcasts - consumer.assert_next("worm-node/test", &broadcast1.consumer); - consumer.assert_next("foobar/test", &broadcast2.consumer); + consumer.assert_next("worm-node/test", &broadcast1.consume()); + consumer.assert_next("foobar/test", &broadcast2.consume()); consumer.assert_next_wait(); } @@ -1168,15 +1146,15 @@ mod tests { let broadcast3 = Broadcast::produce(); // User with root "demo" allowed to subscribe to "worm-node" and "foobar" - let demo_producer = origin.producer.with_root("demo").expect("should create demo root"); + let demo_producer = origin.with_root("demo").expect("should create demo root"); let limited_producer = demo_producer .publish_only(&["worm-node".into(), "foobar".into()]) .expect("should create limited producer"); // Publish broadcasts at different levels - assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consumer.clone())); - assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consumer.clone())); - assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consumer.clone())); + assert!(limited_producer.publish_broadcast("worm-node", broadcast1.consume())); + assert!(limited_producer.publish_broadcast("worm-node/foo", broadcast2.consume())); + assert!(limited_producer.publish_broadcast("foobar/bar", broadcast3.consume())); // Test 1: consume_only("worm-node") should result in a single "" node with contents of "worm-node" ONLY let mut worm_consumer = limited_producer @@ -1184,8 +1162,8 @@ mod tests { .expect("should create worm-node consumer"); // Should see worm-node content with paths stripped to "" - worm_consumer.assert_next("worm-node", &broadcast1.consumer); - worm_consumer.assert_next("worm-node/foo", &broadcast2.consumer); + worm_consumer.assert_next("worm-node", &broadcast1.consume()); + worm_consumer.assert_next("worm-node/foo", &broadcast2.consume()); worm_consumer.assert_next_wait(); // Should NOT see foobar content // Test 2: consume_only("worm-node/foo") should result in a "" node with contents of "worm-node/foo" @@ -1193,7 +1171,7 @@ mod tests { .consume_only(&["worm-node/foo".into()]) .expect("should create worm-node/foo consumer"); - foo_consumer.assert_next("worm-node/foo", &broadcast2.consumer); + foo_consumer.assert_next("worm-node/foo", &broadcast2.consume()); foo_consumer.assert_next_wait(); // Should NOT see other content } @@ -1206,14 +1184,13 @@ mod tests { // Producer with multiple allowed roots let limited_producer = origin - .producer .publish_only(&["app1".into(), "app2".into(), "shared".into()]) .expect("should create limited producer"); // Publish to each root - assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consumer.clone())); - assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consumer.clone())); - assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consumer.clone())); + assert!(limited_producer.publish_broadcast("app1/data", broadcast1.consume())); + assert!(limited_producer.publish_broadcast("app2/config", broadcast2.consume())); + assert!(limited_producer.publish_broadcast("shared/resource", broadcast3.consume())); // consume_only with empty prefix should maintain all roots let mut consumer = limited_producer @@ -1221,9 +1198,9 @@ mod tests { .expect("should create consumer with empty prefix"); // Should see all broadcasts from all roots - consumer.assert_next("app1/data", &broadcast1.consumer); - consumer.assert_next("app2/config", &broadcast2.consumer); - consumer.assert_next("shared/resource", &broadcast3.consumer); + consumer.assert_next("app1/data", &broadcast1.consume()); + consumer.assert_next("app2/config", &broadcast2.consume()); + consumer.assert_next("shared/resource", &broadcast3.consume()); consumer.assert_next_wait(); } @@ -1234,7 +1211,6 @@ mod tests { // Producer with specific allowed paths let limited_producer = origin - .producer .publish_only(&["services/api".into(), "services/web".into()]) .expect("should create limited producer"); @@ -1244,10 +1220,10 @@ mod tests { .expect("should create producer with empty prefix"); // Should still have the same publishing restrictions - assert!(same_producer.publish_broadcast("services/api", broadcast.consumer.clone())); - assert!(same_producer.publish_broadcast("services/web", broadcast.consumer.clone())); - assert!(!same_producer.publish_broadcast("services/db", broadcast.consumer.clone())); - assert!(!same_producer.publish_broadcast("other", broadcast.consumer.clone())); + assert!(same_producer.publish_broadcast("services/api", broadcast.consume())); + assert!(same_producer.publish_broadcast("services/web", broadcast.consume())); + assert!(!same_producer.publish_broadcast("services/db", broadcast.consume())); + assert!(!same_producer.publish_broadcast("other", broadcast.consume())); } #[tokio::test] @@ -1259,21 +1235,20 @@ mod tests { // Producer with broad permission let limited_producer = origin - .producer .publish_only(&["org".into()]) .expect("should create limited producer"); // Publish at various depths - assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consumer.clone())); - assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consumer.clone())); - assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consumer.clone())); + assert!(limited_producer.publish_broadcast("org/team1/project1", broadcast1.consume())); + assert!(limited_producer.publish_broadcast("org/team1/project2", broadcast2.consume())); + assert!(limited_producer.publish_broadcast("org/team2/project1", broadcast3.consume())); // Narrow down to team1 only let mut team1_consumer = limited_producer .consume_only(&["org/team2".into()]) .expect("should create team1 consumer"); - team1_consumer.assert_next("org/team2/project1", &broadcast3.consumer); + team1_consumer.assert_next("org/team2/project1", &broadcast3.consume()); team1_consumer.assert_next_wait(); // Should NOT see team1 content // Further narrow down to team1/project1 @@ -1282,7 +1257,7 @@ mod tests { .expect("should create project1 consumer"); // Should only see project1 content at root - project1_consumer.assert_next("org/team1/project1", &broadcast1.consumer); + project1_consumer.assert_next("org/team1/project1", &broadcast1.consume()); project1_consumer.assert_next_wait(); } @@ -1292,7 +1267,6 @@ mod tests { // Producer with specific allowed paths let limited_producer = origin - .producer .publish_only(&["allowed/path".into()]) .expect("should create limited producer"); @@ -1310,14 +1284,14 @@ mod tests { let broadcast2 = Broadcast::produce(); // Setup: user with root "demo" allowed to subscribe to specific paths - let demo_producer = origin.producer.with_root("demo").expect("should create demo root"); + let demo_producer = origin.with_root("demo").expect("should create demo root"); let user_producer = demo_producer .publish_only(&["worm-node".into(), "foobar".into()]) .expect("should create user producer"); // Publish some data - assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consumer.clone())); - assert!(user_producer.publish_broadcast("foobar", broadcast2.consumer.clone())); + assert!(user_producer.publish_broadcast("worm-node/data", broadcast1.consume())); + assert!(user_producer.publish_broadcast("foobar", broadcast2.consume())); // Key test: consume_only with "" should maintain access to allowed roots let mut consumer = user_producer @@ -1325,8 +1299,8 @@ mod tests { .expect("consume_only with empty prefix should not fail when user has specific permissions"); // Should still receive broadcasts from allowed paths - consumer.assert_next("worm-node/data", &broadcast1.consumer); - consumer.assert_next("foobar", &broadcast2.consumer); + consumer.assert_next("worm-node/data", &broadcast1.consume()); + consumer.assert_next("foobar", &broadcast2.consume()); consumer.assert_next_wait(); // Also test that we can still narrow the scope @@ -1334,7 +1308,7 @@ mod tests { .consume_only(&["worm-node".into()]) .expect("should be able to narrow scope to worm-node"); - narrow_consumer.assert_next("worm-node/data", &broadcast1.consumer); + narrow_consumer.assert_next("worm-node/data", &broadcast1.consume()); narrow_consumer.assert_next_wait(); // Should not see foobar } } diff --git a/rs/moq-lite/src/model/produce.rs b/rs/moq-lite/src/model/produce.rs deleted file mode 100644 index 0c2520649..000000000 --- a/rs/moq-lite/src/model/produce.rs +++ /dev/null @@ -1,10 +0,0 @@ -/// A named tuple of a producer and consumer for convenience. -/// -/// The producer and consumer may each be cloned as many times as you want. -/// However when the number of references reaches zero, the other will receive a signal to close. -/// A new consumer may be created at any time by calling the producer's `consume()` method. -#[derive(Clone)] -pub struct Produce { - pub producer: P, - pub consumer: C, -} diff --git a/rs/moq-lite/src/model/track.rs b/rs/moq-lite/src/model/track.rs index 79323635a..8d110ad78 100644 --- a/rs/moq-lite/src/model/track.rs +++ b/rs/moq-lite/src/model/track.rs @@ -14,7 +14,7 @@ use tokio::sync::watch; -use crate::{Error, Produce, Result}; +use crate::{Error, Result}; use super::{Group, GroupConsumer, GroupProducer}; @@ -37,10 +37,8 @@ impl Track { } } - pub fn produce(self) -> Produce { - let producer = TrackProducer::new(self); - let consumer = producer.consume(); - Produce { producer, consumer } + pub fn produce(self) -> TrackProducer { + TrackProducer::new(self) } } @@ -78,7 +76,7 @@ pub struct TrackProducer { } impl TrackProducer { - fn new(info: Track) -> Self { + pub fn new(info: Track) -> Self { Self { info, state: Default::default(), @@ -101,8 +99,8 @@ impl TrackProducer { /// /// If the sequence number is not the latest, this method will return None. pub fn create_group(&mut self, info: Group) -> Option { - let group = info.produce(); - self.insert_group(group.consumer).then_some(group.producer) + let group = info.producer(); + self.insert_group(group.consume()).then_some(group) } /// Create a new group with the next sequence number. @@ -116,11 +114,11 @@ impl TrackProducer { state.trim(now); let sequence = state.max_sequence.map_or(0, |sequence| sequence + 1); - let group = Group { sequence }.produce(); - state.groups.push_back((now, group.consumer)); + let group = Group { sequence }.producer(); + state.groups.push_back((now, group.consume())); state.max_sequence = Some(sequence); - producer = Some(group.producer); + producer = Some(group); true }); @@ -144,10 +142,12 @@ impl TrackProducer { /// Create a new consumer for the track. pub fn consume(&self) -> TrackConsumer { + let state = self.state.borrow(); TrackConsumer { info: self.info.clone(), state: self.state.subscribe(), - index: 0, + // Start at the latest group + index: state.offset + state.groups.len().saturating_sub(1), } } diff --git a/rs/moq-native/examples/chat.rs b/rs/moq-native/examples/chat.rs index b73f4d7a3..b3f1e2ec3 100644 --- a/rs/moq-native/examples/chat.rs +++ b/rs/moq-native/examples/chat.rs @@ -12,8 +12,8 @@ async fn main() -> anyhow::Result<()> { // This is a simple example of how you can concurrently run multiple tasks. // tokio::spawn works too. tokio::select! { - res = run_broadcast(origin.producer) => res, - res = run_session(origin.consumer) => res, + res = run_session(origin.consume()) => res, + res = run_broadcast(origin) => res, } } @@ -41,7 +41,7 @@ async fn run_broadcast(origin: moq_lite::OriginProducer) -> anyhow::Result<()> { // Create a track that we'll insert into the broadcast. // A track is a series of groups representing a live stream. - let mut track = broadcast.producer.create_track(moq_lite::Track { + let mut track = broadcast.create_track(moq_lite::Track { name: "chat".to_string(), priority: 0, }); @@ -49,7 +49,7 @@ async fn run_broadcast(origin: moq_lite::OriginProducer) -> anyhow::Result<()> { // NOTE: The path is empty because we're using the URL to scope the broadcast. // If you put "alice" here, it would be published as "anon/chat-example/alice". // OPTIONAL: We publish after inserting the track just to avoid a nearly impossible race condition. - origin.publish_broadcast("", broadcast.consumer); + origin.publish_broadcast("", broadcast.consume()); // Create a group. // Each group is independent and the newest group(s) will be prioritized. diff --git a/rs/moq-relay/src/cluster.rs b/rs/moq-relay/src/cluster.rs index 09650902a..ecfc95210 100644 --- a/rs/moq-relay/src/cluster.rs +++ b/rs/moq-relay/src/cluster.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, path::PathBuf, sync::Arc}; +use std::{collections::HashMap, path::PathBuf}; use anyhow::Context; use moq_lite::{Broadcast, BroadcastConsumer, BroadcastProducer, Origin, OriginConsumer, OriginProducer}; @@ -57,16 +57,16 @@ pub struct Cluster { client: moq_native::Client, // Advertises ourselves as an origin to other nodes. - noop: moq_lite::Produce, + noop: BroadcastProducer, // Broadcasts announced by local clients (users). - pub primary: Arc>, + pub primary: OriginProducer, // Broadcasts announced by remote servers (cluster). - pub secondary: Arc>, + pub secondary: OriginProducer, // Broadcasts announced by local clients and remote servers. - pub combined: Arc>, + pub combined: OriginProducer, } impl Cluster { @@ -75,9 +75,9 @@ impl Cluster { config, client, noop: Broadcast::produce(), - primary: Arc::new(Origin::produce()), - secondary: Arc::new(Origin::produce()), - combined: Arc::new(Origin::produce()), + primary: Origin::produce(), + secondary: Origin::produce(), + combined: Origin::produce(), } } @@ -92,7 +92,7 @@ impl Cluster { }; // Scope the origin to our root. - let subscribe_origin = subscribe_origin.producer.with_root(&token.root)?; + let subscribe_origin = subscribe_origin.with_root(&token.root)?; subscribe_origin.consume_only(&token.subscribe) } @@ -104,15 +104,14 @@ impl Cluster { false => &self.primary, }; - let publish_origin = publish_origin.producer.with_root(&token.root)?; + let publish_origin = publish_origin.with_root(&token.root)?; publish_origin.publish_only(&token.publish) } pub fn get(&self, broadcast: &str) -> Option { self.primary - .consumer .consume_broadcast(broadcast) - .or_else(|| self.secondary.consumer.consume_broadcast(broadcast)) + .or_else(|| self.secondary.consume_broadcast(broadcast)) } pub async fn run(self) -> anyhow::Result<()> { @@ -133,14 +132,13 @@ impl Cluster { // Use with_root to automatically strip the prefix from announced paths. let origins = self .secondary - .producer .with_root(&self.config.prefix) .context("no authorized origins")?; // Announce ourselves as an origin to the root node. if let Some(myself) = self.config.node.as_ref() { tracing::info!(%myself, "announcing as leaf"); - origins.publish_broadcast(myself, self.noop.consumer.clone()); + origins.publish_broadcast(myself, self.noop.consume()); } // If the token is provided, read it from the disk and use it in the query parameter. @@ -150,7 +148,7 @@ impl Cluster { None => "".to_string(), }; - let noop = self.noop.consumer.clone(); + let noop = self.noop.consume(); // Despite returning a Result, we should NEVER return an Ok tokio::select! { @@ -171,8 +169,8 @@ impl Cluster { // Shovel broadcasts from the primary and secondary origins into the combined origin. async fn run_combined(self) -> anyhow::Result<()> { - let mut primary = self.primary.consumer.consume(); - let mut secondary = self.secondary.consumer.consume(); + let mut primary = self.primary.consume(); + let mut secondary = self.secondary.consume(); loop { let (name, broadcast) = tokio::select! { @@ -183,7 +181,7 @@ impl Cluster { }; if let Some(broadcast) = broadcast { - self.combined.producer.publish_broadcast(&name, broadcast); + self.combined.publish_broadcast(&name, broadcast); } } } @@ -265,8 +263,8 @@ impl Cluster { let session = self .client .clone() - .with_publish(self.primary.consumer.consume()) - .with_consume(self.secondary.producer.clone()) + .with_publish(self.primary.consume()) + .with_consume(self.secondary.clone()) .connect(url.clone()) .await .context("failed to connect to remote")?; From 47d495fa96610fd4dd2e38a7eee99f90f7bb38f5 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Wed, 28 Jan 2026 08:31:01 -0300 Subject: [PATCH 4/5] Close audio groups immediately. --- js/hang/src/container/legacy.ts | 3 +- rs/hang/src/import/aac.rs | 8 ++- rs/hang/src/import/fmp4.rs | 118 +++++++++++++++++++++++--------- rs/hang/src/import/opus.rs | 8 ++- rs/hang/src/model/frame.rs | 23 +++++++ rs/hang/src/model/track.rs | 14 +--- 6 files changed, 119 insertions(+), 55 deletions(-) diff --git a/js/hang/src/container/legacy.ts b/js/hang/src/container/legacy.ts index f4b4c738e..d3ab74a1c 100644 --- a/js/hang/src/container/legacy.ts +++ b/js/hang/src/container/legacy.ts @@ -209,8 +209,7 @@ export class Consumer { if (this.#active !== undefined && first.consumer.sequence <= this.#active) { this.#groups.shift(); - // TODO Debug why this fires? - // console.warn(`skipping slow group: ${first.consumer.sequence} < ${this.#groups[0]?.consumer.sequence}`); + console.warn(`skipping slow group: ${first.consumer.sequence} < ${this.#groups[0]?.consumer.sequence}`); first.consumer.close(); first.frames.length = 0; diff --git a/rs/hang/src/import/aac.rs b/rs/hang/src/import/aac.rs index d8f9a9c3f..240e1e970 100644 --- a/rs/hang/src/import/aac.rs +++ b/rs/hang/src/import/aac.rs @@ -7,7 +7,7 @@ use moq_lite as moq; /// AAC decoder, initialized via AudioSpecificConfig (variable length from ESDS box). pub struct Aac { broadcast: hang::BroadcastProducer, - track: Option, + track: Option, zero: Option, } @@ -119,7 +119,7 @@ impl Aac { let audio = catalog.insert_audio(track.info.name.clone(), config); audio.priority = 2; - self.track = Some(track.into()); + self.track = Some(track); Ok(()) } @@ -140,7 +140,9 @@ impl Aac { payload, }; - track.write(frame)?; + let mut group = track.append_group(); + frame.encode(&mut group)?; + group.close(); Ok(()) } diff --git a/rs/hang/src/import/fmp4.rs b/rs/hang/src/import/fmp4.rs index b2e58ef37..986897260 100644 --- a/rs/hang/src/import/fmp4.rs +++ b/rs/hang/src/import/fmp4.rs @@ -54,8 +54,16 @@ pub struct Fmp4 { moof_raw: Option, } +#[derive(PartialEq, Debug)] +enum TrackKind { + Video, + Audio, +} + struct Fmp4Track { - producer: hang::TrackProducer, + kind: TrackKind, + + producer: moq_lite::TrackProducer, // The current group being written, only used for passthrough mode. group: Option, @@ -71,8 +79,9 @@ struct Fmp4Track { } impl Fmp4Track { - fn new(producer: hang::TrackProducer) -> Self { + fn new(kind: TrackKind, producer: moq_lite::TrackProducer) -> Self { Self { + kind, producer, group: None, min_buffer: None, @@ -170,7 +179,7 @@ impl Fmp4 { let track_id = trak.tkhd.track_id; let handler = &trak.mdia.hdlr.handler; - let track = match handler.as_ref() { + let (kind, track) = match handler.as_ref() { b"vide" => { let config = self.init_video(trak)?; @@ -185,8 +194,7 @@ impl Fmp4 { // Record this track name created_video_tracks.push(track.name.clone()); - let track = self.broadcast.create_track(track); - hang::TrackProducer::new(track) + (TrackKind::Video, track) } b"soun" => { let config = self.init_audio(trak)?; @@ -202,14 +210,15 @@ impl Fmp4 { // Record this track name created_audio_tracks.push(track.name.clone()); - let track = self.broadcast.create_track(track); - hang::TrackProducer::new(track) + (TrackKind::Audio, track) } b"sbtl" => anyhow::bail!("subtitle tracks are not supported"), handler => anyhow::bail!("unknown track type: {:?}", handler), }; - self.tracks.insert(track_id, Fmp4Track::new(track)); + let track = self.broadcast.create_track(track); + + self.tracks.insert(track_id, Fmp4Track::new(kind, track)); } self.moov = Some(moov); @@ -524,21 +533,25 @@ impl Fmp4 { anyhow::bail!("invalid data offset"); } - let keyframe = if trak.mdia.hdlr.handler == b"vide".into() { - // https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177 - let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther - let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample - - keyframe && !non_sync - } else { - // Audio frames are always keyframes. - true + let keyframe = match track.kind { + TrackKind::Video => { + // https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177 + let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther + let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample + + keyframe && !non_sync + } + TrackKind::Audio => { + // Audio frames are always keyframes. + // TODO: Optionally bundle audio frames into groups to + true + } }; contains_keyframe |= keyframe; if !self.config.passthrough { - // TODO Avoid a copy if mp4-atom uses Bytes? + // TODO Avoid a copy if mp4-atom switches to using Bytes? let payload = Bytes::copy_from_slice(&mdat.data[offset..(offset + size)]); let frame = hang::Frame { @@ -546,7 +559,32 @@ impl Fmp4 { keyframe, payload: payload.into(), }; - track.producer.write(frame)?; + + // NOTE: We inline some of the hang::TrackProducer logic so we get more control over the group creation. + // This is completely optional; you can use hang::TrackProducer if you want. + let mut group = match track.kind { + // If this is a video keyframe, we create a new group. + TrackKind::Video if keyframe => { + if let Some(group) = track.group.take() { + // Close the previous group if it exists. + group.close(); + } + track.producer.append_group() + } + // If this is a video non-keyframe, we use the previous group. + TrackKind::Video => track.group.take().context("no keyframe at start")?, + TrackKind::Audio => { + // For audio, we send the entire fragment as a single group. + // This is an optimization to avoid a burst of tiny groups, possibly hitting MAX_STREAMS, when it doesn't really matter. + // ex. 2s of audio: 1 group instead of 90 groups. + // Technically, individual groups are better for skipping, but it's a moot point if fMP4 is introducing so much latency. + track.group.take().unwrap_or_else(|| track.producer.append_group()) + } + }; + + // Encode the frame and update the group. + frame.encode(&mut group)?; + track.group = Some(group); } if timestamp >= max_timestamp.unwrap_or(Timestamp::ZERO) { @@ -577,7 +615,7 @@ impl Fmp4 { group.close(); } - track.producer.inner.append_group() + track.producer.append_group() } else { track.group.take().context("no keyframe at start")? }; @@ -594,6 +632,11 @@ impl Fmp4 { frame.close(); track.group = Some(group); + } else if track.kind == TrackKind::Audio { + // Close the audio group if it exists. + if let Some(group) = track.group.take() { + group.close(); + } } if let (Some(min), Some(max), Some(min_duration)) = (min_timestamp, max_timestamp, track.min_duration) { @@ -608,17 +651,23 @@ impl Fmp4 { // Update the catalog with the new min_buffer let mut catalog = self.broadcast.catalog.lock(); - // We're lazy and don't keep track if this track is for audio or video, so just try to update both. - if let Some(video) = catalog.video.as_mut() - && let Some(config) = video.renditions.get_mut(&track.producer.info.name) - { - config.min_buffer = Some(min_buffer.convert()?); - } - - if let Some(audio) = catalog.audio.as_mut() - && let Some(config) = audio.renditions.get_mut(&track.producer.info.name) - { - config.min_buffer = Some(min_buffer.convert()?); + match track.kind { + TrackKind::Video => { + let video = catalog.video.as_mut().context("missing video")?; + let config = video + .renditions + .get_mut(&track.producer.info.name) + .context("missing video config")?; + config.min_buffer = Some(min_buffer.convert()?); + } + TrackKind::Audio => { + let audio = catalog.audio.as_mut().context("missing audio")?; + let config = audio + .renditions + .get_mut(&track.producer.info.name) + .context("missing audio config")?; + config.min_buffer = Some(min_buffer.convert()?); + } } } } @@ -633,9 +682,10 @@ impl Drop for Fmp4 { let mut catalog = self.broadcast.catalog.lock(); for track in self.tracks.values() { - // We're too lazy to keep track of if this track is for audio or video, so we just remove both. - catalog.remove_video(&track.producer.info.name); - catalog.remove_audio(&track.producer.info.name); + match track.kind { + TrackKind::Video => catalog.remove_video(&track.producer.info.name), + TrackKind::Audio => catalog.remove_audio(&track.producer.info.name), + } } } } diff --git a/rs/hang/src/import/opus.rs b/rs/hang/src/import/opus.rs index a5acf71ad..cfbf17159 100644 --- a/rs/hang/src/import/opus.rs +++ b/rs/hang/src/import/opus.rs @@ -7,7 +7,7 @@ use moq_lite as moq; /// Opus decoder, initialized via a OpusHead. Does not support Ogg. pub struct Opus { broadcast: hang::BroadcastProducer, - track: Option, + track: Option, zero: Option, } @@ -65,7 +65,7 @@ impl Opus { let audio = catalog.insert_audio(track.info.name.clone(), config); audio.priority = 2; - self.track = Some(track.into()); + self.track = Some(track); Ok(()) } @@ -86,7 +86,9 @@ impl Opus { payload, }; - track.write(frame)?; + let mut group = track.append_group(); + frame.encode(&mut group)?; + group.close(); Ok(()) } diff --git a/rs/hang/src/model/frame.rs b/rs/hang/src/model/frame.rs index 6432dc05c..2a770306b 100644 --- a/rs/hang/src/model/frame.rs +++ b/rs/hang/src/model/frame.rs @@ -1,6 +1,10 @@ +use bytes::{Buf, BytesMut}; use derive_more::Debug; pub use buf_list::BufList; +use moq_lite::{coding::Encode, lite}; + +use crate::Error; pub type Timestamp = moq_lite::Timescale<1_000_000>; @@ -32,3 +36,22 @@ pub struct Frame { #[debug("{} bytes", payload.num_bytes())] pub payload: BufList, } + +impl Frame { + /// Encode the frame to the given group. + pub fn encode(&self, group: &mut moq_lite::GroupProducer) -> Result<(), Error> { + let mut header = BytesMut::new(); + self.timestamp.encode(&mut header, lite::Version::Draft02); + + let size = header.len() + self.payload.remaining(); + + let mut chunked = group.create_frame(size.into()); + chunked.write_chunk(header.freeze()); + for chunk in &self.payload { + chunked.write_chunk(chunk.clone()); + } + chunked.close(); + + Ok(()) + } +} diff --git a/rs/hang/src/model/track.rs b/rs/hang/src/model/track.rs index 3fae96be0..8e9e103ec 100644 --- a/rs/hang/src/model/track.rs +++ b/rs/hang/src/model/track.rs @@ -5,8 +5,6 @@ use crate::Error; use crate::model::{Frame, GroupConsumer, Timestamp}; use futures::{StreamExt, stream::FuturesUnordered}; -use moq_lite::{coding::*, lite}; - /// A producer for media tracks. /// /// This wraps a `moq_lite::TrackProducer` and adds hang-specific functionality @@ -47,9 +45,6 @@ impl TrackProducer { pub fn write(&mut self, frame: Frame) -> Result<(), Error> { tracing::trace!(?frame, "write frame"); - let mut header = BytesMut::new(); - frame.timestamp.encode(&mut header, lite::Version::Draft02); - if frame.keyframe { if let Some(group) = self.group.take() { group.close(); @@ -73,14 +68,7 @@ impl TrackProducer { None => return Err(Error::MissingKeyframe), }; - let size = header.len() + frame.payload.remaining(); - - let mut chunked = group.create_frame(size.into()); - chunked.write_chunk(header.freeze()); - for chunk in frame.payload { - chunked.write_chunk(chunk); - } - chunked.close(); + frame.encode(&mut group)?; self.group.replace(group); From 9d34a897edfa5eac36e4e590d802816722fe59a0 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Thu, 29 Jan 2026 14:37:10 -0300 Subject: [PATCH 5/5] PR comment. --- rs/moq-lite/src/lite/publisher.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rs/moq-lite/src/lite/publisher.rs b/rs/moq-lite/src/lite/publisher.rs index 379e172c4..4f0c90c65 100644 --- a/rs/moq-lite/src/lite/publisher.rs +++ b/rs/moq-lite/src/lite/publisher.rs @@ -220,13 +220,12 @@ impl Publisher { loop { let group = tokio::select! { - biased; - Some(group) = track.next_group().transpose() => group, // Poll all active group futures; never matches but keeps them running. true = async { while tasks.next().await.is_some() {} false } => unreachable!(), + Some(group) = track.next_group().transpose() => group, else => return Ok(()), }?;