Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ categories = ["multimedia", "network-programming", "web-programming"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
moq-transfork = "0.4"
moq-native = "0.5"
moq-karp = "0.8"
moq-transfork = "0.6"
moq-native = "0.5.8"
moq-karp = "0.10"

gst = { package = "gstreamer", version = "0.23" }
gst-base = { package = "gstreamer-base", version = "0.23" }
Expand All @@ -25,6 +25,8 @@ tokio = { version = "1", features = ["full"] }
env_logger = "0.9"
anyhow = { version = "1", features = ["backtrace"] }

url = "2.x"

[build-dependencies]
gst-plugin-version-helper = "0.8"

Expand Down
5 changes: 2 additions & 3 deletions dev/pub
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,5 @@ if [ ! -f dev/bbb.fmp4 ]; then
fi

# Run gstreamer and pipe the output to moq-pub
gst-launch-1.0 -v -e multifilesrc location="dev/bbb.fmp4" loop=true ! qtdemux name=demux \
demux.video_0 ! h264parse ! queue ! identity sync=true ! isofmp4mux name=mux chunk-duration=1 fragment-duration=1 ! moqsink url="$URL" broadcast="$NAME" \
demux.audio_0 ! aacparse ! queue ! mux.
# (Room hardcoded for testing)
gst-launch-1.0 -v -e multifilesrc location="dev/bbb.fmp4" loop=true ! qtdemux name=demux demux.video_0 ! h264parse ! queue ! identity sync=true ! mp4mux name=mux chunk-duration=1 fragment-duration=1 ! moqsink server="$URL" broadcast="$NAME" room="Room2" ! demux.audio_0 ! aacparse ! queue ! mux.
58 changes: 37 additions & 21 deletions src/sink/imp.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use anyhow::Context as _;
// use anyhow::Context as _;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_base::subclass::prelude::*;

use moq_karp::Room;
use moq_transfork::Session;
use moq_native::{quic, tls};
use once_cell::sync::Lazy;
// use tokio::sync::broadcast;
use std::sync::Arc;
use std::sync::Mutex;
use url::Url;
use tokio::runtime::Runtime;


pub static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
Expand All @@ -19,16 +25,19 @@ pub static RUNTIME: Lazy<tokio::runtime::Runtime> = Lazy::new(|| {

#[derive(Default)]
struct Settings {
pub url: Option<String>,
pub server: String,
pub broadcast: String,
pub room: String,
pub tls_disable_verify: bool,
}

#[derive(Default)]
struct State {
pub media: Option<moq_karp::cmaf::Import>,
pub session: Option<moq_transfork::Session>,
pub room: Option<moq_karp::Room>,
pub published: bool, // true if the media has been published
pub broadcast_name: String,
}

#[derive(Default)]
Expand Down Expand Up @@ -75,7 +84,8 @@ impl ObjectImpl for MoqSink {
let mut settings = self.settings.lock().unwrap();

match pspec.name() {
"url" => settings.url = Some(value.get().unwrap()),
"server" => settings.server = value.get().unwrap(),
"room" => settings.server = value.get().unwrap(),
"broadcast" => settings.broadcast = value.get().unwrap(),
"tls-disable-verify" => settings.tls_disable_verify = value.get().unwrap(),
_ => unimplemented!(),
Expand All @@ -86,8 +96,10 @@ impl ObjectImpl for MoqSink {
let settings = self.settings.lock().unwrap();

match pspec.name() {
"url" => settings.url.to_value(),

"broadcast" => settings.broadcast.to_value(),
"server" => settings.server.to_value(),
"room" => settings.room.to_value(),
"tls-disable-verify" => settings.tls_disable_verify.to_value(),
_ => unimplemented!(),
}
Expand Down Expand Up @@ -153,8 +165,18 @@ impl BaseSinkImpl for MoqSink {

if !state.published {
if let Some(session) = state.session.as_mut() {
media.publish(session).expect("failed to publish");

let broadcast = state.room.as_ref().expect("Problem here").publish(&state.broadcast_name).expect("Error publishing");
let mut input = tokio::io::stdin();

let mut import = moq_karp::cmaf::Import::new(broadcast);

let rt = Runtime::new().expect("Error here");
//rt.block_on(import.init_from(&mut input).await.context("failed to initialize").expect("Error"));
let _ = rt.block_on(import.init_from(&mut input));

state.published = true;

}
}

Expand All @@ -167,13 +189,6 @@ impl BaseSinkImpl for MoqSink {
impl MoqSink {
fn setup(&self) -> anyhow::Result<()> {
let settings = self.settings.lock().unwrap();
let broadcast = moq_transfork::Path::default().push(&settings.broadcast);

let broadcast = moq_karp::produce::Resumable::new(broadcast).broadcast();
let media = moq_karp::cmaf::Import::new(broadcast);

let url = settings.url.clone().context("missing url")?;
let url = url.parse().context("invalid URL")?;

// TODO support TLS certs and other options
let config = quic::Args {
Expand All @@ -185,21 +200,22 @@ impl MoqSink {
}
.load()?;

let client = quic::Endpoint::new(config)?.client;

let mut state = self.state.lock().unwrap();
state.media = Some(media);
let quic = quic::Endpoint::new(config)?;
let server_url = Url::parse(&settings.server).expect("Failed to expect string");

let state = self.state.clone();
let room_name = settings.room.clone();
let broadcast_name = settings.broadcast.clone();

// We have to perform the connect in a background task because we can't block the main thread
// // We have to perform the connect in a background task because we can't block the main thread
tokio::spawn(async move {
let session = client.connect(&url).await.expect("failed to connect");
let session = moq_transfork::Session::connect(session)
.await
.expect("failed to connect");

let session = quic.client.connect(&server_url).await.expect("Failed to connect");
let session = Session::connect(session).await.expect("Failed to open session");
let room = Room::new(session.clone(), room_name);
state.lock().unwrap().session = Some(session);
state.lock().unwrap().room = Some(room);
state.lock().unwrap().broadcast_name = broadcast_name;

// TODO figure out how to close gstreamer gracefully on session close
});
Expand Down