diff --git a/Cargo.lock b/Cargo.lock index 5ca0c19..8aec855 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1204,19 +1204,19 @@ dependencies = [ "moq-transfork", "once_cell", "tokio", + "url", ] [[package]] name = "moq-karp" -version = "0.8.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1c84b42dbdc0414b8a22f6521bbaa0fefa4913d393206803c40991d915f9e95" +checksum = "0dd8291f9a0c82f4356ce4f330764558eee60d68e93ee22d56cda2c153337058" dependencies = [ "anyhow", "bytes", "clap", "derive_more", - "futures", "hex", "lazy_static", "moq-native", @@ -1234,9 +1234,9 @@ dependencies = [ [[package]] name = "moq-native" -version = "0.5.6" +version = "0.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "905cc2a2193bdbe1c48d3c8f08f7044fbb5b550d720fa7572b101afee7686bde" +checksum = "089a2cbfca0e296f9b996e01dc2e0f91010eb00384896670a503e86a1bf54e91" dependencies = [ "anyhow", "clap", @@ -1258,9 +1258,9 @@ dependencies = [ [[package]] name = "moq-transfork" -version = "0.4.3" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7bca71d1b85893f79ffc0dd9f1f25627019879ec6ae6cff398954862859f34a" +checksum = "a41ef6c9e82bd54628e96950928d9bd818988873b6e22903d23ce6974b5333d4" dependencies = [ "bytes", "futures", diff --git a/Cargo.toml b/Cargo.toml index d0dc206..344960c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,9 +14,9 @@ categories = ["multimedia", "network-programming", "web-programming"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -moq-transfork = "0.4" -moq-native = "0.5" -moq-karp = "0.8" +moq-transfork = "0.6" +moq-native = "0.5.8" +moq-karp = "0.10" gst = { package = "gstreamer", version = "0.23" } gst-base = { package = "gstreamer-base", version = "0.23" } @@ -25,6 +25,8 @@ tokio = { version = "1", features = ["full"] } env_logger = "0.9" anyhow = { version = "1", features = ["backtrace"] } +url = "2.x" + [build-dependencies] gst-plugin-version-helper = "0.8" diff --git a/dev/pub b/dev/pub index bb51815..4749f7c 100755 --- a/dev/pub +++ b/dev/pub @@ -46,6 +46,5 @@ if [ ! -f dev/bbb.fmp4 ]; then fi # Run gstreamer and pipe the output to moq-pub -gst-launch-1.0 -v -e multifilesrc location="dev/bbb.fmp4" loop=true ! qtdemux name=demux \ - demux.video_0 ! h264parse ! queue ! identity sync=true ! isofmp4mux name=mux chunk-duration=1 fragment-duration=1 ! moqsink url="$URL" broadcast="$NAME" \ - demux.audio_0 ! aacparse ! queue ! mux. +# (Room hardcoded for testing) +gst-launch-1.0 -v -e multifilesrc location="dev/bbb.fmp4" loop=true ! qtdemux name=demux demux.video_0 ! h264parse ! queue ! identity sync=true ! mp4mux name=mux chunk-duration=1 fragment-duration=1 ! moqsink server="$URL" broadcast="$NAME" room="Room2" ! demux.audio_0 ! aacparse ! queue ! mux. diff --git a/src/sink/imp.rs b/src/sink/imp.rs index 7a0ae93..5ba28fd 100644 --- a/src/sink/imp.rs +++ b/src/sink/imp.rs @@ -1,13 +1,19 @@ -use anyhow::Context as _; +// use anyhow::Context as _; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; use gst_base::subclass::prelude::*; +use moq_karp::Room; +use moq_transfork::Session; use moq_native::{quic, tls}; use once_cell::sync::Lazy; +// use tokio::sync::broadcast; use std::sync::Arc; use std::sync::Mutex; +use url::Url; +use tokio::runtime::Runtime; + pub static RUNTIME: Lazy = Lazy::new(|| { tokio::runtime::Builder::new_multi_thread() @@ -19,8 +25,9 @@ pub static RUNTIME: Lazy = Lazy::new(|| { #[derive(Default)] struct Settings { - pub url: Option, + pub server: String, pub broadcast: String, + pub room: String, pub tls_disable_verify: bool, } @@ -28,7 +35,9 @@ struct Settings { struct State { pub media: Option, pub session: Option, + pub room: Option, pub published: bool, // true if the media has been published + pub broadcast_name: String, } #[derive(Default)] @@ -75,7 +84,8 @@ impl ObjectImpl for MoqSink { let mut settings = self.settings.lock().unwrap(); match pspec.name() { - "url" => settings.url = Some(value.get().unwrap()), + "server" => settings.server = value.get().unwrap(), + "room" => settings.server = value.get().unwrap(), "broadcast" => settings.broadcast = value.get().unwrap(), "tls-disable-verify" => settings.tls_disable_verify = value.get().unwrap(), _ => unimplemented!(), @@ -86,8 +96,10 @@ impl ObjectImpl for MoqSink { let settings = self.settings.lock().unwrap(); match pspec.name() { - "url" => settings.url.to_value(), + "broadcast" => settings.broadcast.to_value(), + "server" => settings.server.to_value(), + "room" => settings.room.to_value(), "tls-disable-verify" => settings.tls_disable_verify.to_value(), _ => unimplemented!(), } @@ -153,8 +165,18 @@ impl BaseSinkImpl for MoqSink { if !state.published { if let Some(session) = state.session.as_mut() { - media.publish(session).expect("failed to publish"); + + let broadcast = state.room.as_ref().expect("Problem here").publish(&state.broadcast_name).expect("Error publishing"); + let mut input = tokio::io::stdin(); + + let mut import = moq_karp::cmaf::Import::new(broadcast); + + let rt = Runtime::new().expect("Error here"); + //rt.block_on(import.init_from(&mut input).await.context("failed to initialize").expect("Error")); + let _ = rt.block_on(import.init_from(&mut input)); + state.published = true; + } } @@ -167,13 +189,6 @@ impl BaseSinkImpl for MoqSink { impl MoqSink { fn setup(&self) -> anyhow::Result<()> { let settings = self.settings.lock().unwrap(); - let broadcast = moq_transfork::Path::default().push(&settings.broadcast); - - let broadcast = moq_karp::produce::Resumable::new(broadcast).broadcast(); - let media = moq_karp::cmaf::Import::new(broadcast); - - let url = settings.url.clone().context("missing url")?; - let url = url.parse().context("invalid URL")?; // TODO support TLS certs and other options let config = quic::Args { @@ -185,21 +200,22 @@ impl MoqSink { } .load()?; - let client = quic::Endpoint::new(config)?.client; - - let mut state = self.state.lock().unwrap(); - state.media = Some(media); + let quic = quic::Endpoint::new(config)?; + let server_url = Url::parse(&settings.server).expect("Failed to expect string"); let state = self.state.clone(); + let room_name = settings.room.clone(); + let broadcast_name = settings.broadcast.clone(); - // We have to perform the connect in a background task because we can't block the main thread + // // We have to perform the connect in a background task because we can't block the main thread tokio::spawn(async move { - let session = client.connect(&url).await.expect("failed to connect"); - let session = moq_transfork::Session::connect(session) - .await - .expect("failed to connect"); + let session = quic.client.connect(&server_url).await.expect("Failed to connect"); + let session = Session::connect(session).await.expect("Failed to open session"); + let room = Room::new(session.clone(), room_name); state.lock().unwrap().session = Some(session); + state.lock().unwrap().room = Some(room); + state.lock().unwrap().broadcast_name = broadcast_name; // TODO figure out how to close gstreamer gracefully on session close });