From 234dff8dbb655dd7fa7bfc4ee8167e076f8640b9 Mon Sep 17 00:00:00 2001 From: Philippe Normand Date: Mon, 29 Jun 2020 14:08:51 +0100 Subject: [PATCH] webrtc: Add Janus video-room example This Rust crate provides a program able to connect to a Janus instance using WebSockets and send a live video stream to the videoroom plugin. Part-of: --- webrtc/janus/rust/Cargo.lock | 1099 ++++++++++++++++++++++++++++++++++++++++ webrtc/janus/rust/Cargo.toml | 25 + webrtc/janus/rust/src/janus.rs | 707 ++++++++++++++++++++++++++ webrtc/janus/rust/src/main.rs | 186 +++++++ 4 files changed, 2017 insertions(+) create mode 100644 webrtc/janus/rust/Cargo.lock create mode 100644 webrtc/janus/rust/Cargo.toml create mode 100644 webrtc/janus/rust/src/janus.rs create mode 100644 webrtc/janus/rust/src/main.rs diff --git a/webrtc/janus/rust/Cargo.lock b/webrtc/janus/rust/Cargo.lock new file mode 100644 index 0000000..3cde9a5 --- /dev/null +++ b/webrtc/janus/rust/Cargo.lock @@ -0,0 +1,1099 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +[[package]] +name = "aho-corasick" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "043164d8ba5c4c3035fec9bbee8647c0261d788f3474306f93bb65901cae0e86" +dependencies = [ + "memchr", +] + +[[package]] +name = "anyhow" +version = "1.0.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f" + +[[package]] +name = "async-tungstenite" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c35882200d230428ae9cf74acb72aaa9c0a912911b4dad8fb890da49608e127" +dependencies = [ + "futures-io", + "futures-util", + "gio", + "glib", + "log", + "pin-project", + "tungstenite", +] + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + +[[package]] +name = "autocfg" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" + +[[package]] +name = "base64" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" + +[[package]] +name = "bitflags" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" + +[[package]] +name = "block-buffer" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b" +dependencies = [ + "block-padding", + "byte-tools", + "byteorder", + "generic-array", +] + +[[package]] +name = "block-padding" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5" +dependencies = [ + "byte-tools", +] + +[[package]] +name = "byte-tools" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" + +[[package]] +name = "byteorder" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" + +[[package]] +name = "bytes" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "118cf036fbb97d0816e3c34b2d7a1e8cfc60f68fcf63d550ddbe9bd5f59c213b" +dependencies = [ + "loom", +] + +[[package]] +name = "cc" +version = "1.0.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1be3409f94d7bdceeb5f5fac551039d9b3f00e25da7a74fc4d33400a0d96368" + +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + +[[package]] +name = "clap" +version = "2.33.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129" +dependencies = [ + "bitflags", + "textwrap", + "unicode-width", +] + +[[package]] +name = "digest" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" +dependencies = [ + "generic-array", +] + +[[package]] +name = "env_logger" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "fake-simd" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "futures" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" + +[[package]] +name = "futures-executor" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" + +[[package]] +name = "futures-macro" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" + +[[package]] +name = "futures-task" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" +dependencies = [ + "once_cell", +] + +[[package]] +name = "futures-util" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] + +[[package]] +name = "generator" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "add72f17bb81521258fcc8a7a3245b1e184e916bfbe34f0ea89558f440df5c68" +dependencies = [ + "cc", + "libc", + "log", + "rustc_version", + "winapi", +] + +[[package]] +name = "generic-array" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec" +dependencies = [ + "typenum", +] + +[[package]] +name = "getrandom" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "gio" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cd10f9415cce39b53f8024bf39a21f84f8157afa52da53837b102e585a296a5" +dependencies = [ + "bitflags", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "gio-sys", + "glib", + "glib-sys", + "gobject-sys", + "lazy_static", + "libc", +] + +[[package]] +name = "gio-sys" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fad225242b9eae7ec8a063bb86974aca56885014672375e5775dc0ea3533911" +dependencies = [ + "glib-sys", + "gobject-sys", + "libc", + "pkg-config", +] + +[[package]] +name = "glib" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40fb573a09841b6386ddf15fd4bc6655b4f5b106ca962f57ecaecde32a0061c0" +dependencies = [ + "bitflags", + "futures-channel", + "futures-core", + "futures-executor", + "futures-task", + "futures-util", + "glib-sys", + "gobject-sys", + "lazy_static", + "libc", +] + +[[package]] +name = "glib-sys" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95856f3802f446c05feffa5e24859fe6a183a7cb849c8449afc35c86b1e316e2" +dependencies = [ + "libc", + "pkg-config", +] + +[[package]] +name = "gobject-sys" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31d1a804f62034eccf370006ccaef3708a71c31d561fee88564abe71177553d9" +dependencies = [ + "glib-sys", + "libc", + "pkg-config", +] + +[[package]] +name = "gstreamer" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce8664a114cd6ec16bece783d5eee59496919915b1f6884400ba4a953274a163" +dependencies = [ + "bitflags", + "cfg-if", + "futures-channel", + "futures-core", + "futures-util", + "glib", + "glib-sys", + "gobject-sys", + "gstreamer-sys", + "lazy_static", + "libc", + "muldiv", + "num-rational", + "paste", +] + +[[package]] +name = "gstreamer-sdp" +version = "0.15.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "547b3b0eb9e01e13ab5cc066c817e2ab758f83790145f80f62d3c8e43c2966af" +dependencies = [ + "glib", + "glib-sys", + "gobject-sys", + "gstreamer", + "gstreamer-sdp-sys", + "gstreamer-sys", +] + +[[package]] +name = "gstreamer-sdp-sys" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99e88ac4f9f20323ef3409dddcea3bbf58364ff8eea10b14da5303bfcb23347a" +dependencies = [ + "glib-sys", + "gobject-sys", + "gstreamer-sys", + "libc", + "pkg-config", +] + +[[package]] +name = "gstreamer-sys" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d18da01b97d0ab5896acd5151e4c155acefd0e6c03c3dd24dd133ba054053db" +dependencies = [ + "glib-sys", + "gobject-sys", + "libc", + "pkg-config", +] + +[[package]] +name = "gstreamer-webrtc" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f433d1294266fb1d65e1dc2d4de365f7f4caf23cb72db3a3bd6904eeec88cf1" +dependencies = [ + "glib", + "glib-sys", + "gobject-sys", + "gstreamer", + "gstreamer-sdp", + "gstreamer-sys", + "gstreamer-webrtc-sys", + "libc", +] + +[[package]] +name = "gstreamer-webrtc-sys" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f392bd821b42efecfc21016c8ef20da188b45a45bbb5ddf81758704f93aae615" +dependencies = [ + "glib-sys", + "gobject-sys", + "gstreamer-sdp-sys", + "gstreamer-sys", + "libc", + "pkg-config", +] + +[[package]] +name = "heck" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" +dependencies = [ + "unicode-segmentation", +] + +[[package]] +name = "hermit-abi" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9586eedd4ce6b3c498bc3b4dd92fc9f11166aa908a914071953768066c67909" +dependencies = [ + "libc", +] + +[[package]] +name = "http" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "httparse" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9" + +[[package]] +name = "humantime" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" +dependencies = [ + "quick-error", +] + +[[package]] +name = "idna" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9" +dependencies = [ + "matches", + "unicode-bidi", + "unicode-normalization", +] + +[[package]] +name = "input_buffer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754" +dependencies = [ + "bytes", +] + +[[package]] +name = "itoa" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6" + +[[package]] +name = "janus-video-room" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-tungstenite", + "env_logger", + "futures", + "gio", + "glib", + "gstreamer", + "gstreamer-sdp", + "gstreamer-webrtc", + "http", + "log", + "rand", + "serde", + "serde_derive", + "serde_json", + "structopt", + "url", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.71" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9457b06509d27052635f90d6466700c65095fdf75409b3fbdd903e988b886f49" + +[[package]] +name = "log" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "loom" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ecc775857611e1df29abba5c41355cdf540e7e9d4acfdf0f355eefee82330b7" +dependencies = [ + "cfg-if", + "generator", + "scoped-tls", +] + +[[package]] +name = "matches" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" + +[[package]] +name = "memchr" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" + +[[package]] +name = "muldiv" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0419348c027fa7be448d2ae7ea0e4e04c2334c31dc4e74ab29f00a2a7ca69204" + +[[package]] +name = "num-integer" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d59457e662d541ba17869cf51cf177c0b5f0cbf476c66bdc90bf1edac4f875b" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac267bcc07f48ee5f8935ab0d24f316fb722d7a1292e2913f0cc196b29ffd611" +dependencies = [ + "autocfg", +] + +[[package]] +name = "once_cell" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" + +[[package]] +name = "opaque-debug" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" + +[[package]] +name = "paste" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45ca20c77d80be666aef2b45486da86238fabe33e38306bd3118fe4af33fa880" +dependencies = [ + "paste-impl", + "proc-macro-hack", +] + +[[package]] +name = "paste-impl" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d95a7db200b97ef370c8e6de0088252f7e0dfff7d047a28528e47456c0fc98b6" +dependencies = [ + "proc-macro-hack", +] + +[[package]] +name = "percent-encoding" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" + +[[package]] +name = "pin-project" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12e3a6cdbfe94a5e4572812a0201f8c0ed98c1c452c7b8563ce2276988ef9c17" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a0ffd45cf79d88737d7cc85bfd5d2894bee1139b356e616fe85dc389c61aaf7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "pkg-config" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05da548ad6865900e60eaba7f589cc0783590a92e940c26953ff81ddbab2d677" + +[[package]] +name = "ppv-lite86" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea" + +[[package]] +name = "proc-macro-error" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc175e9777c3116627248584e8f8b3e2987405cabe1c0adf7d1dd28f09dc7880" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cc9795ca17eb581285ec44936da7fc2335a3f34f2ddd13118b6f4d515435c50" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "syn-mid", + "version_check", +] + +[[package]] +name = "proc-macro-hack" +version = "0.5.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4" + +[[package]] +name = "proc-macro-nested" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" + +[[package]] +name = "proc-macro2" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "beae6331a816b1f65d04c45b078fd8e6c93e8071771f41b8163255bbd8d7c8fa" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + +[[package]] +name = "quote" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom", + "libc", + "rand_chacha", + "rand_core", + "rand_hc", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core", +] + +[[package]] +name = "regex" +version = "1.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", + "thread_local", +] + +[[package]] +name = "regex-syntax" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8" + +[[package]] +name = "rustc_version" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" +dependencies = [ + "semver", +] + +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" + +[[package]] +name = "scoped-tls" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "332ffa32bf586782a3efaeb58f127980944bbc8c4d6913a86107ac2a5ab24b28" + +[[package]] +name = "semver" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" +dependencies = [ + "semver-parser", +] + +[[package]] +name = "semver-parser" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" + +[[package]] +name = "serde" +version = "1.0.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5317f7588f0a5078ee60ef675ef96735a1442132dc645eb1d12c018620ed8cd3" + +[[package]] +name = "serde_derive" +version = "1.0.114" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a0be94b04690fbaed37cddffc5c134bf537c8e3329d53e982fe04c374978f8e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec2c5d7e739bc07a3e73381a39d61fdb5f671c60c1df26a130690665803d8226" +dependencies = [ + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "sha-1" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d94d0bede923b3cea61f3f1ff57ff8cdfd77b400fb8f9998949e0cf04163df" +dependencies = [ + "block-buffer", + "digest", + "fake-simd", + "opaque-debug", +] + +[[package]] +name = "slab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" + +[[package]] +name = "structopt" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de2f5e239ee807089b62adce73e48c625e0ed80df02c7ab3f068f5db5281065c" +dependencies = [ + "clap", + "lazy_static", + "structopt-derive", +] + +[[package]] +name = "structopt-derive" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "510413f9de616762a4fbeab62509bf15c729603b72d7cd71280fbca431b1c118" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "syn" +version = "1.0.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8d5d96e8cbb005d6959f119f773bfaebb5684296108fb32600c00cde305b2cd" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "syn-mid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7be3539f6c128a931cf19dcee741c1af532c7fd387baa739c03dd2e96479338a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "termcolor" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + +[[package]] +name = "thread_local" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "tinyvec" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53953d2d3a5ad81d9f844a32f14ebb121f50b650cd59d0ee2a07cf13c617efed" + +[[package]] +name = "tungstenite" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5c7d464221cb0b538a1cd12f6d9127ed1e6bb7f3ffca98fb3cd4c6e3af8175c" +dependencies = [ + "base64", + "byteorder", + "bytes", + "http", + "httparse", + "input_buffer", + "log", + "rand", + "sha-1", + "url", + "utf-8", +] + +[[package]] +name = "typenum" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "373c8a200f9e67a0c95e62a4f52fbf80c23b4381c05a17845531982fa99e6b33" + +[[package]] +name = "unicode-bidi" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5" +dependencies = [ + "matches", +] + +[[package]] +name = "unicode-normalization" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fb19cf769fa8c6a80a162df694621ebeb4dafb606470b2b2fce0be40a98a977" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "unicode-segmentation" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83e153d1053cbb5a118eeff7fd5be06ed99153f00dbcd8ae310c5fb2b22edc0" + +[[package]] +name = "unicode-width" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "caaa9d531767d1ff2150b9332433f32a24622147e5ebb1f26409d5da67afd479" + +[[package]] +name = "unicode-xid" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" + +[[package]] +name = "url" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb" +dependencies = [ + "idna", + "matches", + "percent-encoding", +] + +[[package]] +name = "utf-8" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7" + +[[package]] +name = "version_check" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" + +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/webrtc/janus/rust/Cargo.toml b/webrtc/janus/rust/Cargo.toml new file mode 100644 index 0000000..29c6aad --- /dev/null +++ b/webrtc/janus/rust/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "janus-video-room" +version = "0.1.0" +authors = ["Philippe Normand "] +edition = "2018" +license = "LGPL" + +[dependencies] +futures = "0.3" +structopt = { version = "0.3", default-features = false } +anyhow = "1" +url = "2" +rand = "0.7" +async-tungstenite = { version = "0.7", features = ["gio-runtime"] } +gst = { package = "gstreamer", version = "0.15", features = ["v1_14"] } +gst-webrtc = { package = "gstreamer-webrtc", version = "0.15" } +gst-sdp = { package = "gstreamer-sdp", version = "0.15", features = ["v1_14"] } +serde = "1" +serde_derive = "1" +serde_json = "1.0.53" +http = "0.2" +glib = "0.9" +gio = "0.8" +log = "0.4.8" +env_logger = "0.7.1" diff --git a/webrtc/janus/rust/src/janus.rs b/webrtc/janus/rust/src/janus.rs new file mode 100644 index 0000000..e37adf8 --- /dev/null +++ b/webrtc/janus/rust/src/janus.rs @@ -0,0 +1,707 @@ +// GStreamer +// +// Copyright (C) 2018 maxmcd +// Copyright (C) 2019 Sebastian Dröge +// Copyright (C) 2020 Philippe Normand +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, +// Boston, MA 02110-1301, USA. + +use { + anyhow::{anyhow, bail, Context}, + async_tungstenite::{gio::connect_async, tungstenite}, + futures::channel::mpsc, + futures::sink::{Sink, SinkExt}, + futures::stream::{Stream, StreamExt}, + gst::gst_element_error, + gst::prelude::*, + http::Request, + rand::prelude::*, + serde_derive::{Deserialize, Serialize}, + serde_json::json, + std::sync::{Arc, Mutex, Weak}, + structopt::StructOpt, + tungstenite::Message as WsMessage, +}; + +// upgrade weak reference or return +#[macro_export] +macro_rules! upgrade_weak { + ($x:ident, $r:expr) => {{ + match $x.upgrade() { + Some(o) => o, + None => return $r, + } + }}; + ($x:ident) => { + upgrade_weak!($x, ()) + }; +} + +#[derive(Debug)] +struct VideoParameter { + encoder: &'static str, + encoding_name: &'static str, + payloader: &'static str, +} + +const VP8: VideoParameter = VideoParameter { + encoder: "vp8enc target-bitrate=100000 overshoot=25 undershoot=100 deadline=33000 keyframe-max-dist=1", + encoding_name: "VP8", + payloader: "rtpvp8pay picture-id-mode=2" +}; + +const H264: VideoParameter = VideoParameter { + encoder: "x264enc tune=zerolatency", + encoding_name: "H264", + payloader: "rtph264pay", +}; + +impl std::str::FromStr for VideoParameter { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s { + "vp8" => Ok(VP8), + "h264" => Ok(H264), + _ => Err(anyhow!( + "Invalid video parameter: {}. Use either vp8 or h264", + s + )), + } + } +} + +#[derive(Debug, StructOpt)] +pub struct Args { + #[structopt(short, long, default_value = "wss://janus.conf.meetecho.com/ws:8989")] + server: String, + #[structopt(short, long, default_value = "1234")] + room_id: u32, + #[structopt(short, long, default_value = "1234")] + feed_id: u32, + #[structopt(short, long, default_value = "vp8")] + webrtc_video_codec: VideoParameter, +} + +#[derive(Serialize, Deserialize, Debug)] +struct Base { + janus: String, + transaction: Option, + session_id: Option, + sender: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +struct DataHolder { + id: i64, +} + +#[derive(Serialize, Deserialize, Debug)] +struct PluginDataHolder { + videoroom: String, + room: i64, + description: Option, + id: Option, + configured: Option, + video_codec: Option, + unpublished: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +struct PluginHolder { + plugin: String, + data: PluginDataHolder, +} + +#[derive(Serialize, Deserialize, Debug)] +struct IceHolder { + candidate: String, + #[serde(rename = "sdpMLineIndex")] + sdp_mline_index: u32, +} + +#[derive(Serialize, Deserialize, Debug)] +struct JsepHolder { + #[serde(rename = "type")] + type_: String, + sdp: Option, + ice: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +struct JsonReply { + #[serde(flatten)] + base: Base, + data: Option, + #[serde(rename = "plugindata")] + plugin_data: Option, + jsep: Option, +} + +fn transaction_id() -> String { + thread_rng() + .sample_iter(&rand::distributions::Alphanumeric) + .take(30) + .collect() +} + +// Strong reference to the state of one peer +#[derive(Debug, Clone)] +struct Peer(Arc); + +// Weak reference to the state of one peer +#[derive(Debug, Clone)] +struct PeerWeak(Weak); + +impl PeerWeak { + // Try upgrading a weak reference to a strong one + fn upgrade(&self) -> Option { + self.0.upgrade().map(Peer) + } +} + +// To be able to access the Peers's fields directly +impl std::ops::Deref for Peer { + type Target = PeerInner; + + fn deref(&self) -> &PeerInner { + &self.0 + } +} + +#[derive(Clone, Copy, Debug)] +struct ConnectionHandle { + id: i64, + session_id: i64, +} + +// Actual peer state +#[derive(Debug)] +struct PeerInner { + handle: ConnectionHandle, + bin: gst::Bin, + webrtcbin: gst::Element, + send_msg_tx: Arc>>, +} + +impl Peer { + // Downgrade the strong reference to a weak reference + fn downgrade(&self) -> PeerWeak { + PeerWeak(Arc::downgrade(&self.0)) + } + + // Whenever webrtcbin tells us that (re-)negotiation is needed, simply ask + // for a new offer SDP from webrtcbin without any customization and then + // asynchronously send it to the peer via the WebSocket connection + fn on_negotiation_needed(&self) -> Result<(), anyhow::Error> { + info!("starting negotiation with peer"); + + let peer_clone = self.downgrade(); + let promise = gst::Promise::new_with_change_func(move |res| { + let s = res.expect("no answer"); + let peer = upgrade_weak!(peer_clone); + + if let Err(err) = peer.on_offer_created(&s.to_owned()) { + gst_element_error!( + peer.bin, + gst::LibraryError::Failed, + ("Failed to send SDP offer: {:?}", err) + ); + } + }); + + self.webrtcbin + .emit("create-offer", &[&None::, &promise])?; + + Ok(()) + } + + // Once webrtcbin has create the offer SDP for us, handle it by sending it to the peer via the + // WebSocket connection + fn on_offer_created(&self, reply: &gst::Structure) -> Result<(), anyhow::Error> { + let offer = reply + .get_value("offer")? + .get::() + .expect("Invalid argument") + .expect("Invalid offer"); + self.webrtcbin + .emit("set-local-description", &[&offer, &None::])?; + + info!("sending SDP offer to peer: {:?}", offer.get_sdp().as_text()); + + let transaction = transaction_id(); + let sdp_data = offer.get_sdp().as_text()?; + let msg = WsMessage::Text( + json!({ + "janus": "message", + "transaction": transaction, + "session_id": self.handle.session_id, + "handle_id": self.handle.id, + "body": { + "request": "publish", + "audio": true, + "video": true, + }, + "jsep": { + "sdp": sdp_data, + "trickle": true, + "type": "offer" + } + }) + .to_string(), + ); + self.send_msg_tx + .lock() + .expect("Invalid message sender") + .unbounded_send(msg) + .with_context(|| "Failed to send SDP offer".to_string())?; + + Ok(()) + } + + // Once webrtcbin has create the answer SDP for us, handle it by sending it to the peer via the + // WebSocket connection + fn on_answer_created(&self, reply: &gst::Structure) -> Result<(), anyhow::Error> { + let answer = reply + .get_value("answer")? + .get::() + .expect("Invalid argument") + .expect("Invalid answer"); + self.webrtcbin + .emit("set-local-description", &[&answer, &None::])?; + + info!( + "sending SDP answer to peer: {:?}", + answer.get_sdp().as_text() + ); + + Ok(()) + } + + // Handle incoming SDP answers from the peer + fn handle_sdp(&self, type_: &str, sdp: &str) -> Result<(), anyhow::Error> { + if type_ == "answer" { + info!("Received answer:\n{}\n", sdp); + + let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) + .map_err(|_| anyhow!("Failed to parse SDP answer"))?; + let answer = + gst_webrtc::WebRTCSessionDescription::new(gst_webrtc::WebRTCSDPType::Answer, ret); + + self.webrtcbin + .emit("set-remote-description", &[&answer, &None::])?; + + Ok(()) + } else if type_ == "offer" { + info!("Received offer:\n{}\n", sdp); + + let ret = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) + .map_err(|_| anyhow!("Failed to parse SDP offer"))?; + + // And then asynchronously start our pipeline and do the next steps. The + // pipeline needs to be started before we can create an answer + let peer_clone = self.downgrade(); + self.bin.call_async(move |_pipeline| { + let peer = upgrade_weak!(peer_clone); + + let offer = gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Offer, + ret, + ); + + peer.0 + .webrtcbin + .emit("set-remote-description", &[&offer, &None::]) + .expect("Unable to set remote description"); + + let peer_clone = peer.downgrade(); + let promise = gst::Promise::new_with_change_func(move |reply| { + let s = reply.expect("No answer"); + let peer = upgrade_weak!(peer_clone); + + if let Err(err) = peer.on_answer_created(&s.to_owned()) { + gst_element_error!( + peer.bin, + gst::LibraryError::Failed, + ("Failed to send SDP answer: {:?}", err) + ); + } + }); + + peer.0 + .webrtcbin + .emit("create-answer", &[&None::, &promise]) + .expect("Unable to create answer"); + }); + + Ok(()) + } else { + bail!("Sdp type is not \"answer\" but \"{}\"", type_) + } + } + + // Handle incoming ICE candidates from the peer by passing them to webrtcbin + fn handle_ice(&self, sdp_mline_index: u32, candidate: &str) -> Result<(), anyhow::Error> { + info!( + "Received remote ice-candidate {} {}", + sdp_mline_index, candidate + ); + self.webrtcbin + .emit("add-ice-candidate", &[&sdp_mline_index, &candidate])?; + + Ok(()) + } + + // Asynchronously send ICE candidates to the peer via the WebSocket connection as a JSON + // message + fn on_ice_candidate(&self, mlineindex: u32, candidate: String) -> Result<(), anyhow::Error> { + let transaction = transaction_id(); + info!("Sending ICE {} {}", mlineindex, &candidate); + let msg = WsMessage::Text( + json!({ + "janus": "trickle", + "transaction": transaction, + "session_id": self.handle.session_id, + "handle_id": self.handle.id, + "candidate": { + "candidate": candidate, + "sdpMLineIndex": mlineindex + }, + }) + .to_string(), + ); + self.send_msg_tx + .lock() + .expect("Invalid message sender") + .unbounded_send(msg) + .with_context(|| "Failed to send ICE candidate".to_string())?; + + Ok(()) + } +} + +// At least shut down the bin here if it didn't happen so far +impl Drop for PeerInner { + fn drop(&mut self) { + let _ = self.bin.set_state(gst::State::Null); + } +} + +type WsStream = + std::pin::Pin> + Send>>; +type WsSink = std::pin::Pin + Send>>; + +pub struct JanusGateway { + ws_stream: Option, + ws_sink: Option, + handle: ConnectionHandle, + peer: Mutex, + send_ws_msg_rx: Option>, +} + +impl JanusGateway { + pub async fn new(pipeline: gst::Bin) -> Result { + let args = Args::from_args(); + let request = Request::builder() + .uri(&args.server) + .header("Sec-WebSocket-Protocol", "janus-protocol") + .body(())?; + + let (mut ws, _) = connect_async(request).await?; + + let transaction = transaction_id(); + let msg = WsMessage::Text( + json!({ + "janus": "create", + "transaction": transaction, + }) + .to_string(), + ); + ws.send(msg).await?; + + let msg = ws + .next() + .await + .ok_or_else(|| anyhow!("didn't receive anything"))??; + let payload = msg.to_text()?; + let json_msg: JsonReply = serde_json::from_str(payload)?; + assert_eq!(json_msg.base.janus, "success"); + assert_eq!(json_msg.base.transaction, Some(transaction)); + let session_id = json_msg.data.expect("no session id").id; + + let transaction = transaction_id(); + let msg = WsMessage::Text( + json!({ + "janus": "attach", + "transaction": transaction, + "plugin": "janus.plugin.videoroom", + "session_id": session_id, + }) + .to_string(), + ); + ws.send(msg).await?; + + let msg = ws + .next() + .await + .ok_or_else(|| anyhow!("didn't receive anything"))??; + let payload = msg.to_text()?; + let json_msg: JsonReply = serde_json::from_str(payload)?; + assert_eq!(json_msg.base.janus, "success"); + assert_eq!(json_msg.base.transaction, Some(transaction)); + let handle = json_msg.data.expect("no session id").id; + + let transaction = transaction_id(); + let msg = WsMessage::Text( + json!({ + "janus": "message", + "transaction": transaction, + "session_id": session_id, + "handle_id": handle, + "body": { + "request": "join", + "ptype": "publisher", + "room": args.room_id, + "id": args.feed_id, + }, + }) + .to_string(), + ); + ws.send(msg).await?; + + let webrtcbin = pipeline + .get_by_name("webrtcbin") + .expect("can't find webrtcbin"); + + let webrtc_codec = &args.webrtc_video_codec; + let bin_description = &format!( + "{encoder} name=encoder ! {payloader} ! queue ! capsfilter name=webrtc-vsink caps=\"application/x-rtp,media=video,encoding-name={encoding_name},payload=96\"", + encoder=webrtc_codec.encoder, payloader=webrtc_codec.payloader, + encoding_name=webrtc_codec.encoding_name + ); + + let encode_bin = gst::parse_bin_from_description(bin_description, false)?; + encode_bin.set_name("encode-bin")?; + + pipeline.add(&encode_bin).expect("Failed to add encode bin"); + + let video_queue = pipeline.get_by_name("vqueue").expect("No vqueue found"); + let encoder = encode_bin.get_by_name("encoder").expect("No encoder"); + + let srcpad = video_queue + .get_static_pad("src") + .expect("Failed to get video queue src pad"); + let sinkpad = encoder + .get_static_pad("sink") + .expect("Failed to get sink pad from encoder"); + + if let Ok(video_ghost_pad) = gst::GhostPad::new(Some("video_sink"), &sinkpad) { + encode_bin.add_pad(&video_ghost_pad)?; + srcpad.link(&video_ghost_pad)?; + } + + let sinkpad2 = webrtcbin + .get_request_pad("sink_%u") + .expect("Unable to request outgoing webrtcbin pad"); + let vsink = encode_bin + .get_by_name("webrtc-vsink") + .expect("No webrtc-vsink found"); + let srcpad = vsink + .get_static_pad("src") + .expect("Element without src pad"); + if let Ok(webrtc_ghost_pad) = gst::GhostPad::new(Some("webrtc_video_src"), &srcpad) { + encode_bin.add_pad(&webrtc_ghost_pad)?; + webrtc_ghost_pad.link(&sinkpad2)?; + } + + if let Ok(transceiver) = webrtcbin.emit("get-transceiver", &[&0.to_value()]) { + if let Some(t) = transceiver { + if let Ok(obj) = t.get::() { + obj.expect("Invalid transceiver") + .set_property("do-nack", &true.to_value())?; + } + } + } + + let (send_ws_msg_tx, send_ws_msg_rx) = mpsc::unbounded::(); + + let connection_handle = ConnectionHandle { + id: handle, + session_id, + }; + + let peer = Peer(Arc::new(PeerInner { + handle: connection_handle, + bin: pipeline, + webrtcbin, + send_msg_tx: Arc::new(Mutex::new(send_ws_msg_tx)), + })); + + // Connect to on-negotiation-needed to handle sending an Offer + let peer_clone = peer.downgrade(); + peer.webrtcbin + .connect("on-negotiation-needed", false, move |_| { + let peer = upgrade_weak!(peer_clone, None); + if let Err(err) = peer.on_negotiation_needed() { + gst_element_error!( + peer.bin, + gst::LibraryError::Failed, + ("Failed to negotiate: {:?}", err) + ); + } + + None + })?; + + // Whenever there is a new ICE candidate, send it to the peer + let peer_clone = peer.downgrade(); + peer.webrtcbin + .connect("on-ice-candidate", false, move |values| { + let mlineindex = values[1] + .get::() + .expect("Invalid argument") + .expect("Invalid type"); + let candidate = values[2] + .get::() + .expect("Invalid argument") + .expect("Invalid type"); + + let peer = upgrade_weak!(peer_clone, None); + if let Err(err) = peer.on_ice_candidate(mlineindex, candidate) { + gst_element_error!( + peer.bin, + gst::LibraryError::Failed, + ("Failed to send ICE candidate: {:?}", err) + ); + } + + None + })?; + + // Split the websocket into the Sink and Stream + let (ws_sink, ws_stream) = ws.split(); + + Ok(Self { + ws_stream: Some(ws_stream.boxed()), + ws_sink: Some(Box::pin(ws_sink)), + handle: connection_handle, + peer: Mutex::new(peer), + send_ws_msg_rx: Some(send_ws_msg_rx), + }) + } + + pub async fn run(&mut self) -> Result<(), anyhow::Error> { + if let Some(ws_stream) = self.ws_stream.take() { + // Fuse the Stream, required for the select macro + let mut ws_stream = ws_stream.fuse(); + + // Channel for outgoing WebSocket messages from other threads + let send_ws_msg_rx = self + .send_ws_msg_rx + .take() + .expect("Invalid message receiver"); + let mut send_ws_msg_rx = send_ws_msg_rx.fuse(); + + let timer = glib::interval_stream(10_000); + let mut timer_fuse = timer.fuse(); + + let mut sink = self.ws_sink.take().expect("Invalid websocket sink"); + loop { + let ws_msg = futures::select! { + // Handle the WebSocket messages here + ws_msg = ws_stream.select_next_some() => { + match ws_msg? { + WsMessage::Close(_) => { + info!("peer disconnected"); + break + }, + WsMessage::Ping(data) => Some(WsMessage::Pong(data)), + WsMessage::Pong(_) => None, + WsMessage::Binary(_) => None, + WsMessage::Text(text) => { + if let Err(err) = self.handle_websocket_message(&text) { + error!("Failed to parse message: {} ... error: {}", &text, err); + } + None + }, + } + }, + // Handle WebSocket messages we created asynchronously + // to send them out now + ws_msg = send_ws_msg_rx.select_next_some() => Some(ws_msg), + + // Handle keepalive ticks, fired every 10 seconds + ws_msg = timer_fuse.select_next_some() => { + let transaction = transaction_id(); + Some(WsMessage::Text( + json!({ + "janus": "keepalive", + "transaction": transaction, + "handle_id": self.handle.id, + "session_id": self.handle.session_id, + }).to_string(), + )) + }, + // Once we're done, break the loop and return + complete => break, + }; + + // If there's a message to send out, do so now + if let Some(ws_msg) = ws_msg { + sink.send(ws_msg).await?; + } + } + } + Ok(()) + } + + fn handle_jsep(&self, jsep: &JsepHolder) -> Result<(), anyhow::Error> { + if let Some(sdp) = &jsep.sdp { + assert_eq!(jsep.type_, "answer"); + let peer = self.peer.lock().expect("Invalid peer"); + return peer.handle_sdp(&jsep.type_, &sdp); + } else if let Some(ice) = &jsep.ice { + let peer = self.peer.lock().expect("Invalid peer"); + return peer.handle_ice(ice.sdp_mline_index, &ice.candidate); + } + + Ok(()) + } + + // Handle WebSocket messages, both our own as well as WebSocket protocol messages + fn handle_websocket_message(&self, msg: &str) -> Result<(), anyhow::Error> { + trace!("Incoming raw message: {}", msg); + let json_msg: JsonReply = serde_json::from_str(msg)?; + let payload_type = &json_msg.base.janus; + if payload_type == "ack" { + trace!( + "Ack transaction {:#?}, sessionId {:#?}", + json_msg.base.transaction, + json_msg.base.session_id + ); + } else { + debug!("Incoming JSON WebSocket message: {:#?}", json_msg); + } + if payload_type == "event" { + if let Some(_plugin_data) = json_msg.plugin_data { + if let Some(jsep) = json_msg.jsep { + return self.handle_jsep(&jsep); + } + } + } + Ok(()) + } +} diff --git a/webrtc/janus/rust/src/main.rs b/webrtc/janus/rust/src/main.rs new file mode 100644 index 0000000..eace4d8 --- /dev/null +++ b/webrtc/janus/rust/src/main.rs @@ -0,0 +1,186 @@ +// GStreamer +// +// Copyright (C) 2019 Sebastian Dröge +// Copyright (C) 2020 Philippe Normand +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, +// Boston, MA 02110-1301, USA. + +#![recursion_limit = "256"] + +use anyhow::bail; +use gst::gst_element_error; +use gst::prelude::*; +use std::sync::{Arc, Weak}; + +#[macro_use] +extern crate log; + +mod janus; + +// Strong reference to our application state +#[derive(Debug, Clone)] +struct App(Arc); + +// Weak reference to our application state +#[derive(Debug, Clone)] +struct AppWeak(Weak); + +// Actual application state +#[derive(Debug)] +struct AppInner { + pipeline: gst::Pipeline, +} + +// To be able to access the App's fields directly +impl std::ops::Deref for App { + type Target = AppInner; + + fn deref(&self) -> &AppInner { + &self.0 + } +} + +impl AppWeak { + // Try upgrading a weak reference to a strong one + fn upgrade(&self) -> Option { + self.0.upgrade().map(App) + } +} + +impl App { + // Downgrade the strong reference to a weak reference + fn downgrade(&self) -> AppWeak { + AppWeak(Arc::downgrade(&self.0)) + } + + fn new() -> Result { + let pipeline = gst::parse_launch( + &"webrtcbin name=webrtcbin stun-server=stun://stun.l.google.com:19302 \ + videotestsrc pattern=ball ! videoconvert ! queue name=vqueue" + .to_string(), + )?; + + let pipeline = pipeline + .downcast::() + .expect("Couldn't downcast pipeline"); + + let bus = pipeline.get_bus().unwrap(); + let app = App(Arc::new(AppInner { pipeline })); + + let app_weak = app.downgrade(); + bus.add_watch_local(move |_bus, msg| { + let app = upgrade_weak!(app_weak, glib::Continue(false)); + + if app.handle_pipeline_message(msg).is_err() { + return glib::Continue(false); + } + glib::Continue(true) + }) + .expect("Unable to add bus watch"); + Ok(app) + } + + fn handle_pipeline_message(&self, message: &gst::Message) -> Result<(), anyhow::Error> { + use gst::message::MessageView; + + match message.view() { + MessageView::Error(err) => bail!( + "Error from element {}: {} ({})", + err.get_src() + .map(|s| String::from(s.get_path_string())) + .unwrap_or_else(|| String::from("None")), + err.get_error(), + err.get_debug().unwrap_or_else(|| String::from("None")), + ), + MessageView::Warning(warning) => { + println!("Warning: \"{}\"", warning.get_debug().unwrap()); + } + _ => (), + } + Ok(()) + } + + pub async fn run(&self) -> Result<(), anyhow::Error> { + let bin = self.pipeline.clone().upcast::(); + let mut gw = janus::JanusGateway::new(bin).await?; + + // Asynchronously set the pipeline to Playing + self.pipeline.call_async(|pipeline| { + // If this fails, post an error on the bus so we exit + if pipeline.set_state(gst::State::Playing).is_err() { + gst_element_error!( + pipeline, + gst::LibraryError::Failed, + ("Failed to set pipeline to Playing") + ); + } + }); + + gw.run().await?; + Ok(()) + } +} + +// Make sure to shut down the pipeline when it goes out of scope +// to release any system resources +impl Drop for AppInner { + fn drop(&mut self) { + let _ = self.pipeline.set_state(gst::State::Null); + } +} + +// Check if all GStreamer plugins we require are available +fn check_plugins() -> Result<(), anyhow::Error> { + let needed = [ + "videotestsrc", + "videoconvert", + "autodetect", + "vpx", + "webrtc", + "nice", + "dtls", + "srtp", + "rtpmanager", + "rtp", + ]; + + let registry = gst::Registry::get(); + let missing = needed + .iter() + .filter(|n| registry.find_plugin(n).is_none()) + .cloned() + .collect::>(); + + if !missing.is_empty() { + bail!("Missing plugins: {:?}", missing); + } else { + Ok(()) + } +} + +async fn async_main() -> Result<(), anyhow::Error> { + gst::init()?; + check_plugins()?; + let app = App::new()?; + app.run().await?; + Ok(()) +} + +fn main() -> Result<(), anyhow::Error> { + env_logger::init(); + let main_context = glib::MainContext::default(); + main_context.block_on(async_main()) +} -- 2.7.4