From 39d4fd8bccf3a13d81e3b79b8b3f195ff75e1356 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Wed, 28 Jan 2026 08:31:01 -0300 Subject: [PATCH] Close audio groups immediately. --- rs/hang/src/import/aac.rs | 8 ++- rs/hang/src/import/fmp4.rs | 120 ++++++++++++++++++++++++++----------- rs/hang/src/import/opus.rs | 8 ++- rs/hang/src/model/frame.rs | 23 +++++++ rs/hang/src/model/track.rs | 14 +---- 5 files changed, 118 insertions(+), 55 deletions(-) diff --git a/rs/hang/src/import/aac.rs b/rs/hang/src/import/aac.rs index 844d458f3..745b4c9bd 100644 --- a/rs/hang/src/import/aac.rs +++ b/rs/hang/src/import/aac.rs @@ -7,7 +7,7 @@ use moq_lite as moq; /// AAC decoder, initialized via AudioSpecificConfig (variable length from ESDS box). pub struct Aac { broadcast: hang::BroadcastProducer, - track: Option, + track: Option, zero: Option, } @@ -120,7 +120,7 @@ impl Aac { let audio = catalog.insert_audio(track.producer.info.name.clone(), config); audio.priority = 2; - self.track = Some(track.producer.into()); + self.track = Some(track.producer); Ok(()) } @@ -141,7 +141,9 @@ impl Aac { payload, }; - track.write(frame)?; + let mut group = track.append_group(); + frame.encode(&mut group)?; + group.close(); Ok(()) } diff --git a/rs/hang/src/import/fmp4.rs b/rs/hang/src/import/fmp4.rs index 4be9e9f73..986897260 100644 --- a/rs/hang/src/import/fmp4.rs +++ b/rs/hang/src/import/fmp4.rs @@ -54,8 +54,16 @@ pub struct Fmp4 { moof_raw: Option, } +#[derive(PartialEq, Debug)] +enum TrackKind { + Video, + Audio, +} + struct Fmp4Track { - producer: hang::TrackProducer, + kind: TrackKind, + + producer: moq_lite::TrackProducer, // The current group being written, only used for passthrough mode. group: Option, @@ -71,8 +79,9 @@ struct Fmp4Track { } impl Fmp4Track { - fn new(producer: hang::TrackProducer) -> Self { + fn new(kind: TrackKind, producer: moq_lite::TrackProducer) -> Self { Self { + kind, producer, group: None, min_buffer: None, @@ -170,7 +179,7 @@ impl Fmp4 { let track_id = trak.tkhd.track_id; let handler = &trak.mdia.hdlr.handler; - let track = match handler.as_ref() { + let (kind, track) = match handler.as_ref() { b"vide" => { let config = self.init_video(trak)?; @@ -185,9 +194,7 @@ impl Fmp4 { // Record this track name created_video_tracks.push(track.name.clone()); - let track = track.produce(); - self.broadcast.insert_track(track.consumer); - hang::TrackProducer::new(track.producer) + (TrackKind::Video, track) } b"soun" => { let config = self.init_audio(trak)?; @@ -203,15 +210,15 @@ impl Fmp4 { // Record this track name created_audio_tracks.push(track.name.clone()); - let track = track.produce(); - self.broadcast.insert_track(track.consumer); - hang::TrackProducer::new(track.producer) + (TrackKind::Audio, track) } b"sbtl" => anyhow::bail!("subtitle tracks are not supported"), handler => anyhow::bail!("unknown track type: {:?}", handler), }; - self.tracks.insert(track_id, Fmp4Track::new(track)); + let track = self.broadcast.create_track(track); + + self.tracks.insert(track_id, Fmp4Track::new(kind, track)); } self.moov = Some(moov); @@ -526,21 +533,25 @@ impl Fmp4 { anyhow::bail!("invalid data offset"); } - let keyframe = if trak.mdia.hdlr.handler == b"vide".into() { - // https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177 - let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther - let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample - - keyframe && !non_sync - } else { - // Audio frames are always keyframes. - true + let keyframe = match track.kind { + TrackKind::Video => { + // https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177 + let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther + let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample + + keyframe && !non_sync + } + TrackKind::Audio => { + // Audio frames are always keyframes. + // TODO: Optionally bundle audio frames into groups to + true + } }; contains_keyframe |= keyframe; if !self.config.passthrough { - // TODO Avoid a copy if mp4-atom uses Bytes? + // TODO Avoid a copy if mp4-atom switches to using Bytes? let payload = Bytes::copy_from_slice(&mdat.data[offset..(offset + size)]); let frame = hang::Frame { @@ -548,7 +559,32 @@ impl Fmp4 { keyframe, payload: payload.into(), }; - track.producer.write(frame)?; + + // NOTE: We inline some of the hang::TrackProducer logic so we get more control over the group creation. + // This is completely optional; you can use hang::TrackProducer if you want. + let mut group = match track.kind { + // If this is a video keyframe, we create a new group. + TrackKind::Video if keyframe => { + if let Some(group) = track.group.take() { + // Close the previous group if it exists. + group.close(); + } + track.producer.append_group() + } + // If this is a video non-keyframe, we use the previous group. + TrackKind::Video => track.group.take().context("no keyframe at start")?, + TrackKind::Audio => { + // For audio, we send the entire fragment as a single group. + // This is an optimization to avoid a burst of tiny groups, possibly hitting MAX_STREAMS, when it doesn't really matter. + // ex. 2s of audio: 1 group instead of 90 groups. + // Technically, individual groups are better for skipping, but it's a moot point if fMP4 is introducing so much latency. + track.group.take().unwrap_or_else(|| track.producer.append_group()) + } + }; + + // Encode the frame and update the group. + frame.encode(&mut group)?; + track.group = Some(group); } if timestamp >= max_timestamp.unwrap_or(Timestamp::ZERO) { @@ -579,7 +615,7 @@ impl Fmp4 { group.close(); } - track.producer.inner.append_group() + track.producer.append_group() } else { track.group.take().context("no keyframe at start")? }; @@ -596,6 +632,11 @@ impl Fmp4 { frame.close(); track.group = Some(group); + } else if track.kind == TrackKind::Audio { + // Close the audio group if it exists. + if let Some(group) = track.group.take() { + group.close(); + } } if let (Some(min), Some(max), Some(min_duration)) = (min_timestamp, max_timestamp, track.min_duration) { @@ -610,17 +651,23 @@ impl Fmp4 { // Update the catalog with the new min_buffer let mut catalog = self.broadcast.catalog.lock(); - // We're lazy and don't keep track if this track is for audio or video, so just try to update both. - if let Some(video) = catalog.video.as_mut() - && let Some(config) = video.renditions.get_mut(&track.producer.info.name) - { - config.min_buffer = Some(min_buffer.convert()?); - } - - if let Some(audio) = catalog.audio.as_mut() - && let Some(config) = audio.renditions.get_mut(&track.producer.info.name) - { - config.min_buffer = Some(min_buffer.convert()?); + match track.kind { + TrackKind::Video => { + let video = catalog.video.as_mut().context("missing video")?; + let config = video + .renditions + .get_mut(&track.producer.info.name) + .context("missing video config")?; + config.min_buffer = Some(min_buffer.convert()?); + } + TrackKind::Audio => { + let audio = catalog.audio.as_mut().context("missing audio")?; + let config = audio + .renditions + .get_mut(&track.producer.info.name) + .context("missing audio config")?; + config.min_buffer = Some(min_buffer.convert()?); + } } } } @@ -635,9 +682,10 @@ impl Drop for Fmp4 { let mut catalog = self.broadcast.catalog.lock(); for track in self.tracks.values() { - // We're too lazy to keep track of if this track is for audio or video, so we just remove both. - catalog.remove_video(&track.producer.info.name); - catalog.remove_audio(&track.producer.info.name); + match track.kind { + TrackKind::Video => catalog.remove_video(&track.producer.info.name), + TrackKind::Audio => catalog.remove_audio(&track.producer.info.name), + } } } } diff --git a/rs/hang/src/import/opus.rs b/rs/hang/src/import/opus.rs index 99583d1ea..ee0ea90d0 100644 --- a/rs/hang/src/import/opus.rs +++ b/rs/hang/src/import/opus.rs @@ -7,7 +7,7 @@ use moq_lite as moq; /// Opus decoder, initialized via a OpusHead. Does not support Ogg. pub struct Opus { broadcast: hang::BroadcastProducer, - track: Option, + track: Option, zero: Option, } @@ -66,7 +66,7 @@ impl Opus { let audio = catalog.insert_audio(track.producer.info.name.clone(), config); audio.priority = 2; - self.track = Some(track.producer.into()); + self.track = Some(track.producer); Ok(()) } @@ -87,7 +87,9 @@ impl Opus { payload, }; - track.write(frame)?; + let mut group = track.append_group(); + frame.encode(&mut group)?; + group.close(); Ok(()) } diff --git a/rs/hang/src/model/frame.rs b/rs/hang/src/model/frame.rs index 6432dc05c..2a770306b 100644 --- a/rs/hang/src/model/frame.rs +++ b/rs/hang/src/model/frame.rs @@ -1,6 +1,10 @@ +use bytes::{Buf, BytesMut}; use derive_more::Debug; pub use buf_list::BufList; +use moq_lite::{coding::Encode, lite}; + +use crate::Error; pub type Timestamp = moq_lite::Timescale<1_000_000>; @@ -32,3 +36,22 @@ pub struct Frame { #[debug("{} bytes", payload.num_bytes())] pub payload: BufList, } + +impl Frame { + /// Encode the frame to the given group. + pub fn encode(&self, group: &mut moq_lite::GroupProducer) -> Result<(), Error> { + let mut header = BytesMut::new(); + self.timestamp.encode(&mut header, lite::Version::Draft02); + + let size = header.len() + self.payload.remaining(); + + let mut chunked = group.create_frame(size.into()); + chunked.write_chunk(header.freeze()); + for chunk in &self.payload { + chunked.write_chunk(chunk.clone()); + } + chunked.close(); + + Ok(()) + } +} diff --git a/rs/hang/src/model/track.rs b/rs/hang/src/model/track.rs index 3fae96be0..8e9e103ec 100644 --- a/rs/hang/src/model/track.rs +++ b/rs/hang/src/model/track.rs @@ -5,8 +5,6 @@ use crate::Error; use crate::model::{Frame, GroupConsumer, Timestamp}; use futures::{StreamExt, stream::FuturesUnordered}; -use moq_lite::{coding::*, lite}; - /// A producer for media tracks. /// /// This wraps a `moq_lite::TrackProducer` and adds hang-specific functionality @@ -47,9 +45,6 @@ impl TrackProducer { pub fn write(&mut self, frame: Frame) -> Result<(), Error> { tracing::trace!(?frame, "write frame"); - let mut header = BytesMut::new(); - frame.timestamp.encode(&mut header, lite::Version::Draft02); - if frame.keyframe { if let Some(group) = self.group.take() { group.close(); @@ -73,14 +68,7 @@ impl TrackProducer { None => return Err(Error::MissingKeyframe), }; - let size = header.len() + frame.payload.remaining(); - - let mut chunked = group.create_frame(size.into()); - chunked.write_chunk(header.freeze()); - for chunk in frame.payload { - chunked.write_chunk(chunk); - } - chunked.close(); + frame.encode(&mut group)?; self.group.replace(group);