Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions rs/hang/src/import/aac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use moq_lite as moq;
/// AAC decoder, initialized via AudioSpecificConfig (variable length from ESDS box).
pub struct Aac {
broadcast: hang::BroadcastProducer,
track: Option<hang::TrackProducer>,
track: Option<moq_lite::TrackProducer>,
zero: Option<tokio::time::Instant>,
}

Expand Down Expand Up @@ -120,7 +120,7 @@ impl Aac {
let audio = catalog.insert_audio(track.producer.info.name.clone(), config);
audio.priority = 2;

self.track = Some(track.producer.into());
self.track = Some(track.producer);

Ok(())
}
Expand All @@ -141,7 +141,9 @@ impl Aac {
payload,
};

track.write(frame)?;
let mut group = track.append_group();
frame.encode(&mut group)?;
group.close();

Ok(())
}
Expand Down
120 changes: 84 additions & 36 deletions rs/hang/src/import/fmp4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,16 @@ pub struct Fmp4 {
moof_raw: Option<Bytes>,
}

#[derive(PartialEq, Debug)]
enum TrackKind {
Video,
Audio,
}

struct Fmp4Track {
producer: hang::TrackProducer,
kind: TrackKind,

producer: moq_lite::TrackProducer,

// The current group being written, only used for passthrough mode.
group: Option<moq_lite::GroupProducer>,
Expand All @@ -71,8 +79,9 @@ struct Fmp4Track {
}

impl Fmp4Track {
fn new(producer: hang::TrackProducer) -> Self {
fn new(kind: TrackKind, producer: moq_lite::TrackProducer) -> Self {
Self {
kind,
producer,
group: None,
min_buffer: None,
Expand Down Expand Up @@ -170,7 +179,7 @@ impl Fmp4 {
let track_id = trak.tkhd.track_id;
let handler = &trak.mdia.hdlr.handler;

let track = match handler.as_ref() {
let (kind, track) = match handler.as_ref() {
b"vide" => {
let config = self.init_video(trak)?;

Expand All @@ -185,9 +194,7 @@ impl Fmp4 {
// Record this track name
created_video_tracks.push(track.name.clone());

let track = track.produce();
self.broadcast.insert_track(track.consumer);
hang::TrackProducer::new(track.producer)
(TrackKind::Video, track)
}
b"soun" => {
let config = self.init_audio(trak)?;
Expand All @@ -203,15 +210,15 @@ impl Fmp4 {
// Record this track name
created_audio_tracks.push(track.name.clone());

let track = track.produce();
self.broadcast.insert_track(track.consumer);
hang::TrackProducer::new(track.producer)
(TrackKind::Audio, track)
}
b"sbtl" => anyhow::bail!("subtitle tracks are not supported"),
handler => anyhow::bail!("unknown track type: {:?}", handler),
};

self.tracks.insert(track_id, Fmp4Track::new(track));
let track = self.broadcast.create_track(track);

self.tracks.insert(track_id, Fmp4Track::new(kind, track));
}

self.moov = Some(moov);
Expand Down Expand Up @@ -526,29 +533,58 @@ impl Fmp4 {
anyhow::bail!("invalid data offset");
}

let keyframe = if trak.mdia.hdlr.handler == b"vide".into() {
// https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177
let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther
let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample

keyframe && !non_sync
} else {
// Audio frames are always keyframes.
true
let keyframe = match track.kind {
TrackKind::Video => {
// https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177
let keyframe = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther
let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample

keyframe && !non_sync
}
TrackKind::Audio => {
// Audio frames are always keyframes.
// TODO: Optionally bundle audio frames into groups to
true
}
};

contains_keyframe |= keyframe;

if !self.config.passthrough {
// TODO Avoid a copy if mp4-atom uses Bytes?
// TODO Avoid a copy if mp4-atom switches to using Bytes?
let payload = Bytes::copy_from_slice(&mdat.data[offset..(offset + size)]);

let frame = hang::Frame {
timestamp,
keyframe,
payload: payload.into(),
};
track.producer.write(frame)?;

// NOTE: We inline some of the hang::TrackProducer logic so we get more control over the group creation.
// This is completely optional; you can use hang::TrackProducer if you want.
let mut group = match track.kind {
// If this is a video keyframe, we create a new group.
TrackKind::Video if keyframe => {
if let Some(group) = track.group.take() {
// Close the previous group if it exists.
group.close();
}
track.producer.append_group()
}
// If this is a video non-keyframe, we use the previous group.
TrackKind::Video => track.group.take().context("no keyframe at start")?,
TrackKind::Audio => {
// For audio, we send the entire fragment as a single group.
// This is an optimization to avoid a burst of tiny groups, possibly hitting MAX_STREAMS, when it doesn't really matter.
// ex. 2s of audio: 1 group instead of 90 groups.
// Technically, individual groups are better for skipping, but it's a moot point if fMP4 is introducing so much latency.
track.group.take().unwrap_or_else(|| track.producer.append_group())
}
};

// Encode the frame and update the group.
frame.encode(&mut group)?;
track.group = Some(group);
}

if timestamp >= max_timestamp.unwrap_or(Timestamp::ZERO) {
Expand Down Expand Up @@ -579,7 +615,7 @@ impl Fmp4 {
group.close();
}

track.producer.inner.append_group()
track.producer.append_group()
} else {
track.group.take().context("no keyframe at start")?
};
Expand All @@ -596,6 +632,11 @@ impl Fmp4 {
frame.close();

track.group = Some(group);
} else if track.kind == TrackKind::Audio {
// Close the audio group if it exists.
if let Some(group) = track.group.take() {
group.close();
}
}

if let (Some(min), Some(max), Some(min_duration)) = (min_timestamp, max_timestamp, track.min_duration) {
Expand All @@ -610,17 +651,23 @@ impl Fmp4 {
// Update the catalog with the new min_buffer
let mut catalog = self.broadcast.catalog.lock();

// We're lazy and don't keep track if this track is for audio or video, so just try to update both.
if let Some(video) = catalog.video.as_mut()
&& let Some(config) = video.renditions.get_mut(&track.producer.info.name)
{
config.min_buffer = Some(min_buffer.convert()?);
}

if let Some(audio) = catalog.audio.as_mut()
&& let Some(config) = audio.renditions.get_mut(&track.producer.info.name)
{
config.min_buffer = Some(min_buffer.convert()?);
match track.kind {
TrackKind::Video => {
let video = catalog.video.as_mut().context("missing video")?;
let config = video
.renditions
.get_mut(&track.producer.info.name)
.context("missing video config")?;
config.min_buffer = Some(min_buffer.convert()?);
}
TrackKind::Audio => {
let audio = catalog.audio.as_mut().context("missing audio")?;
let config = audio
.renditions
.get_mut(&track.producer.info.name)
.context("missing audio config")?;
config.min_buffer = Some(min_buffer.convert()?);
}
}
}
}
Expand All @@ -635,9 +682,10 @@ impl Drop for Fmp4 {
let mut catalog = self.broadcast.catalog.lock();

for track in self.tracks.values() {
// We're too lazy to keep track of if this track is for audio or video, so we just remove both.
catalog.remove_video(&track.producer.info.name);
catalog.remove_audio(&track.producer.info.name);
match track.kind {
TrackKind::Video => catalog.remove_video(&track.producer.info.name),
TrackKind::Audio => catalog.remove_audio(&track.producer.info.name),
}
}
}
}
8 changes: 5 additions & 3 deletions rs/hang/src/import/opus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use moq_lite as moq;
/// Opus decoder, initialized via a OpusHead. Does not support Ogg.
pub struct Opus {
broadcast: hang::BroadcastProducer,
track: Option<hang::TrackProducer>,
track: Option<moq_lite::TrackProducer>,
zero: Option<tokio::time::Instant>,
}

Expand Down Expand Up @@ -66,7 +66,7 @@ impl Opus {
let audio = catalog.insert_audio(track.producer.info.name.clone(), config);
audio.priority = 2;

self.track = Some(track.producer.into());
self.track = Some(track.producer);

Ok(())
}
Expand All @@ -87,7 +87,9 @@ impl Opus {
payload,
};

track.write(frame)?;
let mut group = track.append_group();
frame.encode(&mut group)?;
group.close();

Ok(())
}
Expand Down
23 changes: 23 additions & 0 deletions rs/hang/src/model/frame.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use bytes::{Buf, BytesMut};
use derive_more::Debug;

pub use buf_list::BufList;
use moq_lite::{coding::Encode, lite};

use crate::Error;

pub type Timestamp = moq_lite::Timescale<1_000_000>;

Expand Down Expand Up @@ -32,3 +36,22 @@ pub struct Frame {
#[debug("{} bytes", payload.num_bytes())]
pub payload: BufList,
}

impl Frame {
/// Encode the frame to the given group.
pub fn encode(&self, group: &mut moq_lite::GroupProducer) -> Result<(), Error> {
let mut header = BytesMut::new();
self.timestamp.encode(&mut header, lite::Version::Draft02);

let size = header.len() + self.payload.remaining();

let mut chunked = group.create_frame(size.into());
chunked.write_chunk(header.freeze());
for chunk in &self.payload {
chunked.write_chunk(chunk.clone());
}
chunked.close();

Ok(())
}
}
14 changes: 1 addition & 13 deletions rs/hang/src/model/track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use crate::Error;
use crate::model::{Frame, GroupConsumer, Timestamp};
use futures::{StreamExt, stream::FuturesUnordered};

use moq_lite::{coding::*, lite};

/// A producer for media tracks.
///
/// This wraps a `moq_lite::TrackProducer` and adds hang-specific functionality
Expand Down Expand Up @@ -47,9 +45,6 @@ impl TrackProducer {
pub fn write(&mut self, frame: Frame) -> Result<(), Error> {
tracing::trace!(?frame, "write frame");

let mut header = BytesMut::new();
frame.timestamp.encode(&mut header, lite::Version::Draft02);

if frame.keyframe {
if let Some(group) = self.group.take() {
group.close();
Expand All @@ -73,14 +68,7 @@ impl TrackProducer {
None => return Err(Error::MissingKeyframe),
};

let size = header.len() + frame.payload.remaining();

let mut chunked = group.create_frame(size.into());
chunked.write_chunk(header.freeze());
for chunk in frame.payload {
chunked.write_chunk(chunk);
}
chunked.close();
frame.encode(&mut group)?;

self.group.replace(group);

Expand Down
Loading