Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rs/hang/examples/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProdu
.produce();

// Publish the catalog track to the broadcast.
broadcast.insert_track(catalog.track.consume());
broadcast.insert_track(catalog.track.clone());

// Actually create the media track now.
let track = broadcast.create_track(video_track);
Expand Down
2 changes: 1 addition & 1 deletion rs/hang/src/model/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct BroadcastProducer {
impl BroadcastProducer {
pub fn new(mut inner: moq_lite::BroadcastProducer) -> Self {
let catalog = Catalog::default().produce();
inner.insert_track(catalog.track.consume());
inner.insert_track(catalog.track.clone());

Self {
inner,
Expand Down
2 changes: 1 addition & 1 deletion rs/moq-lite/src/ietf/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ impl<S: web_transport_trait::Session> Subscriber<S> {
// NOTE: This is debated in the IETF draft, but is significantly easier to implement.
let mut broadcast = self.start_announce(msg.track_namespace.to_owned())?;

let exists = broadcast.insert_track(track.consume());
let exists = broadcast.insert_track(track);
if exists {
tracing::warn!(track = %msg.track_name, "track already exists, replacing it");
}
Expand Down
69 changes: 10 additions & 59 deletions rs/moq-lite/src/lite/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use futures::{FutureExt, StreamExt, stream::FuturesUnordered};
use web_async::FuturesExt;

use crate::{
Expand Down Expand Up @@ -215,79 +216,29 @@ impl<S: web_transport_trait::Session> Publisher<S> {
priority: PriorityQueue,
version: Version,
) -> Result<(), Error> {
// TODO use a BTreeMap serve the latest N groups by sequence.
// Until then, we'll implement N=2 manually.
// Also, this is more complicated because we can't use tokio because of WASM.
// We need to drop futures in order to cancel them and keep polling them with select!
let mut old_group = None;
let mut new_group = None;

// Annoying that we can't use a tuple here as we need the compiler to infer the type.
// Otherwise we'd have to pick Send or !Send...
let mut old_sequence = None;
let mut new_sequence = None;

// Keep reading groups from the track, some of which may arrive out of order.
let mut tasks = FuturesUnordered::new();

loop {
let group = tokio::select! {
biased;
// Poll all active group futures; never matches but keeps them running.
true = async {
while tasks.next().await.is_some() {}
false
} => unreachable!(),
Some(group) = track.next_group().transpose() => group,
Some(_) = async { Some(old_group.as_mut()?.await) } => {
old_group = None;
old_sequence = None;
continue;
},
Some(_) = async { Some(new_group.as_mut()?.await) } => {
new_group = old_group;
new_sequence = old_sequence;
old_group = None;
old_sequence = None;
continue;
},
else => return Ok(()),
}?;

let sequence = group.info.sequence;
let latest = new_sequence.as_ref().unwrap_or(&0);

tracing::debug!(subscribe = %subscribe.id, track = %track.info.name, sequence, latest, "serving group");

// If this group is older than the oldest group we're serving, skip it.
// We always serve at most two groups, but maybe we should serve only sequence >= MAX-1.
if sequence < *old_sequence.as_ref().unwrap_or(&0) {
tracing::debug!(subscribe = %subscribe.id, track = %track.info.name, old = %sequence, %latest, "skipping group");
continue;
}
tracing::debug!(subscribe = %subscribe.id, track = %track.info.name, sequence, "serving group");

let msg = lite::Group {
subscribe: subscribe.id,
sequence,
};

let priority = priority.insert(track.info.priority, sequence);

// Spawn a task to serve this group, ignoring any errors because they don't really matter.
// TODO add some logging at least.
let handle = Box::pin(Self::serve_group(session.clone(), msg, priority, group, version));

// Terminate the old group if it's still running.
if let Some(old_sequence) = old_sequence.take() {
tracing::debug!(subscribe = %subscribe.id, track = %track.info.name, old = %old_sequence, %latest, "aborting group");
old_group.take(); // Drop the future to cancel it.
}

assert!(old_group.is_none());

if sequence >= *latest {
old_group = new_group;
old_sequence = new_sequence;

new_group = Some(handle);
new_sequence = Some(sequence);
} else {
old_group = Some(handle);
old_sequence = Some(sequence);
}
tasks.push(Self::serve_group(session.clone(), msg, priority, group, version).map(|_| ()));
}
}

Expand Down
58 changes: 25 additions & 33 deletions rs/moq-lite/src/model/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use super::Track;
struct State {
// When explicitly publishing, we hold a reference to the consumer.
// This prevents the track from being marked as "unused".
published: HashMap<String, TrackConsumer>,
consumers: HashMap<String, TrackConsumer>,

// When requesting, we hold a reference to the producer for dynamic tracks.
// The track will be marked as "unused" when the last consumer is dropped.
requested: HashMap<String, TrackProducer>,
producers: HashMap<String, TrackProducer>,
}

/// A collection of media tracks that can be published and subscribed to.
Expand Down Expand Up @@ -58,8 +58,8 @@ impl BroadcastProducer {
pub fn new() -> Self {
Self {
state: Lock::new(State {
published: HashMap::new(),
requested: HashMap::new(),
consumers: HashMap::new(),
producers: HashMap::new(),
}),
closed: Default::default(),
requested: async_channel::unbounded(),
Expand All @@ -74,24 +74,24 @@ impl BroadcastProducer {

/// Produce a new track and insert it into the broadcast.
pub fn create_track(&mut self, track: Track) -> TrackProducer {
let track = track.produce();
self.insert_track(track.consume());
let track = TrackProducer::new(track);
self.insert_track(track.clone());
track
}

/// Insert a track into the lookup, returning true if it was unique.
pub fn insert_track(&mut self, track: TrackConsumer) -> bool {
///
/// NOTE: You probably want to [TrackProducer::clone] to keep publishing to the track.
pub fn insert_track(&mut self, track: TrackProducer) -> bool {
let mut state = self.state.lock();
let unique = state.published.insert(track.info.name.clone(), track.clone()).is_none();
let removed = state.requested.remove(&track.info.name).is_some();

unique && !removed
state.consumers.insert(track.info.name.clone(), track.consume());
state.producers.insert(track.info.name.clone(), track).is_none()
}

/// Remove a track from the lookup.
pub fn remove_track(&mut self, name: &str) -> bool {
let mut state = self.state.lock();
state.published.remove(name).is_some() || state.requested.remove(name).is_some()
state.consumers.remove(name).is_some() || state.producers.remove(name).is_some()
}

pub fn consume(&self) -> BroadcastConsumer {
Expand Down Expand Up @@ -150,8 +150,8 @@ impl Drop for BroadcastProducer {
let mut state = self.state.lock();

// Cleanup any published tracks.
state.published.clear();
state.requested.clear();
state.consumers.clear();
state.producers.clear();
}
}

Expand Down Expand Up @@ -192,13 +192,7 @@ impl BroadcastConsumer {
pub fn subscribe_track(&self, track: &Track) -> TrackConsumer {
let mut state = self.state.lock();

// Return any explictly published track.
if let Some(consumer) = state.published.get(&track.name).cloned() {
return consumer;
}

// Return any requested tracks.
if let Some(producer) = state.requested.get(&track.name) {
if let Some(producer) = state.producers.get(&track.name) {
return producer.consume();
}

Expand All @@ -219,13 +213,13 @@ impl BroadcastConsumer {
}

// Insert the producer into the lookup so we will deduplicate requests.
state.requested.insert(producer.info.name.clone(), producer.clone());
state.producers.insert(producer.info.name.clone(), producer.clone());

// Remove the track from the lookup when it's unused.
let state = self.state.clone();
web_async::spawn(async move {
producer.unused().await;
state.lock().requested.remove(&producer.info.name);
state.lock().producers.remove(&producer.info.name);
});

consumer
Expand Down Expand Up @@ -268,19 +262,19 @@ mod test {
let mut track1 = Track::new("track1").produce();

// Make sure we can insert before a consumer is created.
producer.insert_track(track1.consume());
producer.insert_track(track1.clone());
track1.append_group();

let consumer = producer.consume();

let mut track1_sub = consumer.subscribe_track(&track1.info);
let mut track1_sub = consumer.subscribe_track(&Track::new("track1"));
track1_sub.assert_group();

let mut track2 = Track::new("track2").produce();
producer.insert_track(track2.consume());
producer.insert_track(track2.clone());

let consumer2 = producer.consume();
let mut track2_consumer = consumer2.subscribe_track(&track2.info);
let mut track2_consumer = consumer2.subscribe_track(&Track::new("track2"));
track2_consumer.assert_no_group();

track2.append_group();
Expand Down Expand Up @@ -330,9 +324,8 @@ mod test {
consumer.assert_not_closed();

// Create a new track and insert it into the broadcast.
let mut track1 = Track::new("track1").produce();
let mut track1 = producer.create_track(Track::new("track1"));
track1.append_group();
producer.insert_track(track1.consume());

let mut track1c = consumer.subscribe_track(&track1.info);
let track2 = consumer.subscribe_track(&Track::new("track2"));
Expand Down Expand Up @@ -405,10 +398,9 @@ mod test {
#[tokio::test]
async fn requested_unused() {
let mut broadcast = Broadcast::produce();
let consumer = broadcast.consume();

// Subscribe to a track that doesn't exist - this creates a request
let consumer1 = consumer.subscribe_track(&Track::new("unknown_track"));
let consumer1 = broadcast.consume().subscribe_track(&Track::new("unknown_track"));

// Get the requested track producer
let producer1 = broadcast.assert_request();
Expand All @@ -420,7 +412,7 @@ mod test {
);

// Making a new consumer will keep the producer alive
let consumer2 = consumer.subscribe_track(&Track::new("unknown_track"));
let consumer2 = broadcast.consume().subscribe_track(&Track::new("unknown_track"));
consumer2.assert_is_clone(&consumer1);

// Drop the consumer subscription
Expand All @@ -447,7 +439,7 @@ mod test {
tokio::time::sleep(std::time::Duration::from_millis(1)).await;

// Now the cleanup task should have run and we can subscribe again to the unknown track.
let consumer3 = consumer.subscribe_track(&Track::new("unknown_track"));
let consumer3 = broadcast.consume().subscribe_track(&Track::new("unknown_track"));
let producer2 = broadcast.assert_request();

// Drop the consumer, now the producer should be unused
Expand Down
20 changes: 20 additions & 0 deletions rs/moq-lite/src/model/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,24 @@ impl GroupConsumer {
}
}
}

pub async fn get_frame(&self, index: usize) -> Result<Option<FrameConsumer>> {
let mut state = self.state.clone();
let Ok(state) = state
.wait_for(|state| index < state.frames.len() || state.closed.is_some())
.await
else {
return Err(Error::Cancel);
};

if let Some(frame) = state.frames.get(index).cloned() {
return Ok(Some(frame));
}

match &state.closed {
Some(Ok(_)) => Ok(None),
Some(Err(err)) => Err(err.clone()),
_ => unreachable!(),
}
}
}
Loading
Loading