From 2f75b7570398d6321e8e852e3bc3f5146f78d4cc Mon Sep 17 00:00:00 2001 From: Lawrence Date: Tue, 24 Feb 2026 09:47:37 +0000 Subject: [PATCH] Add r2r prioritization, QUIC transport, and redis limits # Conflicts: # Cargo.toml # relay/src/main.rs --- Cargo.lock | 330 +++++++++++- Cargo.toml | 7 + relay/Cargo.toml | 4 + relay/src/main.rs | 1222 ++++++++++++++++++++++++++++----------------- 4 files changed, 1091 insertions(+), 472 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14e7724..18bfd88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,12 +225,24 @@ dependencies = [ "shlex", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cfg-if" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.44" @@ -285,7 +297,7 @@ version = "0.1.0" dependencies = [ "serde", "serde_json", - "thiserror", + "thiserror 2.0.18", "tokio", "uuid", ] @@ -399,6 +411,18 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +[[package]] +name = "fastbloom" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7f34442dbe69c60fe8eaf58a8cafff81a1f278816d8ab4db255b3bef4ac3c4" +dependencies = [ + "getrandom 0.3.4", + "libm", + "rand", + "siphasher", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -502,8 +526,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.1+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -513,9 +539,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi", "wasip2", + "wasm-bindgen", ] [[package]] @@ -844,6 +872,28 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" +[[package]] +name = "jni" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" +dependencies = [ + "cesu8", + "cfg-if", + "combine", + "jni-sys", + "log", + "thiserror 1.0.69", + "walkdir", + "windows-sys 0.45.0", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "jobserver" version = "0.1.34" @@ -898,6 +948,12 @@ version = "0.2.182" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6800badb6cb2082ffd7b6a67e6125bb39f18782f793520caee8cb8846be06112" +[[package]] +name = "libm" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" + [[package]] name = "libredox" version = "0.1.12" @@ -929,6 +985,12 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "matchers" version = "0.2.0" @@ -986,7 +1048,7 @@ dependencies = [ "metrics", "metrics-util", "quanta", - "thiserror", + "thiserror 2.0.18", "tokio", "tracing", ] @@ -1263,6 +1325,63 @@ dependencies = [ "winapi", ] +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2", + "thiserror 2.0.18", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +dependencies = [ + "bytes", + "fastbloom", + "getrandom 0.3.4", + "lru-slab", + "rand", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "rustls-platform-verifier", + "slab", + "thiserror 2.0.18", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.60.2", +] + [[package]] name = "quote" version = "1.0.44" @@ -1334,6 +1453,18 @@ dependencies = [ "bitflags", ] +[[package]] +name = "rcgen" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48406db8ac1f3cbc7dcdb56ec355343817958a356ff430259bb07baf7607e1e1" +dependencies = [ + "pem", + "ring", + "time", + "yasna", +] + [[package]] name = "redis" version = "0.32.7" @@ -1395,7 +1526,11 @@ dependencies = [ "fastrand", "metrics", "metrics-exporter-prometheus", + "quinn", + "rcgen", "redis", + "rustls", + "rustls-pemfile", "serde", "serde_json", "tokio", @@ -1418,6 +1553,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustls" version = "0.23.36" @@ -1425,7 +1566,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" dependencies = [ "aws-lc-rs", + "log", "once_cell", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", @@ -1444,15 +1587,52 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" dependencies = [ + "web-time", "zeroize", ] +[[package]] +name = "rustls-platform-verifier" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" +dependencies = [ + "core-foundation", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls", + "rustls-native-certs", + "rustls-platform-verifier-android", + "rustls-webpki", + "security-framework", + "security-framework-sys", + "webpki-root-certs", + "windows-sys 0.61.2", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + [[package]] name = "rustls-webpki" version = "0.103.9" @@ -1477,6 +1657,15 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.28" @@ -1646,7 +1835,7 @@ checksum = "0d585997b0ac10be3c5ee635f1bab02d512760d14b7c468801ac8a01d9ae5f1d" dependencies = [ "num-bigint", "num-traits", - "thiserror", + "thiserror 2.0.18", "time", ] @@ -1735,13 +1924,33 @@ dependencies = [ "syn", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.18", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -2080,6 +2289,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -2220,6 +2439,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki-root-certs" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "whoami" version = "2.1.1" @@ -2249,6 +2487,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -2314,6 +2561,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -2341,6 +2597,21 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -2374,6 +2645,12 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -2386,6 +2663,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -2398,6 +2681,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -2422,6 +2711,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -2434,6 +2729,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -2446,6 +2747,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -2458,6 +2765,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -2564,6 +2877,15 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +[[package]] +name = "yasna" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" +dependencies = [ + "time", +] + [[package]] name = "yoke" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index 578016e..d61df7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,3 +22,10 @@ chrono = { version = "0.4", features = ["serde", "clock"] } metrics = "0.24" metrics-exporter-prometheus = "0.17" tokio-postgres = { version = "0.7", features = ["with-chrono-0_4", "with-serde_json-1"] } +hmac = "0.12" +sha2 = "0.10" +hex = "0.4" +quinn = "0.11" +rustls = "0.23" +rcgen = "0.12" +rustls-pemfile = "2.1" diff --git a/relay/Cargo.toml b/relay/Cargo.toml index 82a8735..eb9b0bd 100644 --- a/relay/Cargo.toml +++ b/relay/Cargo.toml @@ -17,3 +17,7 @@ serde.workspace = true metrics.workspace = true metrics-exporter-prometheus.workspace = true common = { path = "../common" } +quinn.workspace = true +rustls.workspace = true +rcgen.workspace = true +rustls-pemfile.workspace = true diff --git a/relay/src/main.rs b/relay/src/main.rs index 29f814a..951db17 100644 --- a/relay/src/main.rs +++ b/relay/src/main.rs @@ -1,7 +1,10 @@ use std::{ collections::HashMap, + fs::File, + io::BufReader, net::SocketAddr, sync::Arc, + sync::atomic::{AtomicBool, Ordering}, time::{Duration, Instant}, }; @@ -11,14 +14,14 @@ use common::{ minecraft::read_handshake_hostname_and_bytes, protocol::{ ClientFrame, Heartbeat, IncomingTcp, RegisterAccepted, RegisterRequest, RelayForwardPrelude, - R2rFrame, R2rStreamClosed, R2rStreamData, ServerFrame, StreamClosed, StreamData, + ServerFrame, StreamClosed, StreamData, }, }; use redis::AsyncCommands; use serde::Deserialize; use metrics_exporter_prometheus::PrometheusBuilder; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, + io::{AsyncReadExt, AsyncWriteExt, copy, copy_bidirectional}, net::{TcpListener, TcpStream}, sync::{Mutex, Notify, RwLock, mpsc}, time::{MissedTickBehavior, interval, timeout}, @@ -26,6 +29,15 @@ use tokio::{ use tracing::{debug, info, warn}; use uuid::Uuid; +use quinn::{ClientConfig as QuinnClientConfig, Endpoint as QuinnEndpoint, ServerConfig as QuinnServerConfig}; +use rcgen::generate_simple_self_signed; +use rustls::RootCertStore; +use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; +use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}; +use rustls::Error as RustlsError; +use rustls_pemfile::{certs, pkcs8_private_keys, rsa_private_keys}; +use rustls::{DigitallySignedStruct, SignatureScheme}; + #[derive(Clone)] struct RelayConfig { instance_id: String, @@ -34,10 +46,22 @@ struct RelayConfig { player_bind: String, r2r_bind: String, r2r_advertise_addr: String, + r2r_transport: String, + r2r_quic_server_name: String, + r2r_quic_insecure: bool, + r2r_quic_cert: Option, + r2r_quic_key: Option, domain: String, heartbeat_timeout: Duration, registry_ttl_secs: u64, r2r_connect_timeout: Duration, + control_queue_depth: usize, + stream_queue_depth: usize, + control_queue_policy: QueuePolicy, + stream_queue_policy: QueuePolicy, + max_streams_per_session: u64, + max_streams_per_user: u64, + limit_ttl_secs: u64, } impl RelayConfig { @@ -49,6 +73,16 @@ impl RelayConfig { let r2r_bind = std::env::var("RELAY_R2R_BIND").unwrap_or_else(|_| "0.0.0.0:7001".to_string()); let r2r_advertise_addr = std::env::var("RELAY_R2R_ADVERTISE_ADDR") .unwrap_or_else(|_| guess_advertise_addr(&r2r_bind)); + let r2r_transport = std::env::var("RELAY_R2R_TRANSPORT").unwrap_or_else(|_| "tcp".to_string()); + let r2r_quic_server_name = + std::env::var("RELAY_R2R_QUIC_SERVER_NAME").unwrap_or_else(|_| "relay".to_string()); + let r2r_quic_insecure = std::env::var("RELAY_R2R_QUIC_INSECURE") + .ok() + .as_deref() + .map(|v| v == "1" || v.eq_ignore_ascii_case("true")) + .unwrap_or(false); + let r2r_quic_cert = std::env::var("RELAY_R2R_QUIC_CERT").ok(); + let r2r_quic_key = std::env::var("RELAY_R2R_QUIC_KEY").ok(); Self { instance_id: std::env::var("RELAY_INSTANCE_ID") @@ -58,6 +92,11 @@ impl RelayConfig { player_bind, r2r_bind, r2r_advertise_addr, + r2r_transport, + r2r_quic_server_name, + r2r_quic_insecure, + r2r_quic_cert, + r2r_quic_key, domain: std::env::var("RELAY_BASE_DOMAIN").unwrap_or_else(|_| "dvv.one".to_string()), heartbeat_timeout: Duration::from_secs( std::env::var("RELAY_HEARTBEAT_TIMEOUT_SECS") @@ -75,6 +114,28 @@ impl RelayConfig { .and_then(|v| v.parse().ok()) .unwrap_or(3), ), + control_queue_depth: std::env::var("RELAY_CONTROL_QUEUE_DEPTH") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(512), + stream_queue_depth: std::env::var("RELAY_STREAM_QUEUE_DEPTH") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(128), + control_queue_policy: QueuePolicy::from_env("RELAY_CONTROL_QUEUE_POLICY"), + stream_queue_policy: QueuePolicy::from_env("RELAY_STREAM_QUEUE_POLICY"), + max_streams_per_session: std::env::var("RELAY_MAX_STREAMS_PER_SESSION") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(0), + max_streams_per_user: std::env::var("RELAY_MAX_STREAMS_PER_USER") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(0), + limit_ttl_secs: std::env::var("RELAY_LIMIT_TTL_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(30), } } } @@ -82,8 +143,10 @@ impl RelayConfig { #[derive(Clone)] struct SessionHandle { session_id: String, - tx: mpsc::Sender, + tx: ControlSender, stream_sinks: Arc>>>>, + stream_priorities: Arc>>, + user_id: String, last_heartbeat: Instant, } @@ -107,21 +170,6 @@ impl RelayState { type SharedState = Arc>; -#[derive(Clone)] -struct R2rManager { - outbound: Arc>>>, - ingress_stream_sinks: Arc>>>>, -} - -impl R2rManager { - fn new() -> Self { - Self { - outbound: Arc::new(Mutex::new(HashMap::new())), - ingress_stream_sinks: Arc::new(RwLock::new(HashMap::new())), - } - } -} - #[derive(Clone)] struct RelayGuards { player_ip: Arc>>, @@ -134,11 +182,6 @@ struct RelayGuards { reg_ip_burst: f64, session_bw_rate_bytes: f64, session_bw_burst_bytes: f64, - redis: Option, - player_global_window_secs: u64, - player_global_limit: i64, - reg_global_window_secs: u64, - reg_global_limit: i64, } #[derive(Debug, Clone)] @@ -155,6 +198,74 @@ enum SessionDir { EgressToClient, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum QueuePolicy { + Drop, + Deny, +} + +impl QueuePolicy { + fn from_env(var: &str) -> Self { + match std::env::var(var) + .ok() + .map(|v| v.to_ascii_lowercase()) + .as_deref() + { + Some("deny") => QueuePolicy::Deny, + _ => QueuePolicy::Drop, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum StreamPriority { + High, + Normal, +} + +#[derive(Clone)] +struct ControlSender { + high_tx: mpsc::Sender, + low_tx: mpsc::Sender, + policy: QueuePolicy, +} + +impl ControlSender { + fn new(depth: usize, policy: QueuePolicy) -> (Self, ControlReceiver) { + let (high_tx, high_rx) = mpsc::channel(depth.max(1)); + let (low_tx, low_rx) = mpsc::channel(depth.max(1)); + ( + Self { + high_tx, + low_tx, + policy, + }, + ControlReceiver { high_rx, low_rx }, + ) + } + + async fn send_high(&self, frame: ServerFrame) -> bool { + self.send(frame, true).await + } + + async fn send_low(&self, frame: ServerFrame) -> bool { + self.send(frame, false).await + } + + async fn send(&self, frame: ServerFrame, high: bool) -> bool { + let tx = if high { &self.high_tx } else { &self.low_tx }; + match self.policy { + QueuePolicy::Drop => tx.try_send(frame).is_ok(), + QueuePolicy::Deny => tx.send(frame).await.is_ok(), + } + } +} + +struct ControlReceiver { + high_rx: mpsc::Receiver, + low_rx: mpsc::Receiver, +} + impl BucketState { fn new(capacity: f64, rate_per_sec: f64) -> Self { Self { @@ -191,7 +302,7 @@ impl BucketState { } impl RelayGuards { - async fn from_env() -> Self { + fn from_env() -> Self { let player_ip_rate = std::env::var("RELAY_PLAYER_CONNECTS_PER_SEC") .ok() .and_then(|v| v.parse().ok()) @@ -221,29 +332,6 @@ impl RelayGuards { .and_then(|v| v.parse().ok()) .unwrap_or(512.0) * 1024.0; - let redis = match std::env::var("REDIS_URL") { - Ok(url) => match redis::Client::open(url) { - Ok(client) => redis::aio::ConnectionManager::new(client).await.ok(), - Err(_) => None, - }, - Err(_) => None, - }; - let player_global_window_secs = std::env::var("RELAY_PLAYER_GLOBAL_WINDOW_SECS") - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(10); - let player_global_limit = std::env::var("RELAY_PLAYER_GLOBAL_LIMIT") - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(50); - let reg_global_window_secs = std::env::var("RELAY_REG_GLOBAL_WINDOW_SECS") - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(60); - let reg_global_limit = std::env::var("RELAY_REG_GLOBAL_LIMIT") - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(10); Self { player_ip: Arc::new(Mutex::new(HashMap::new())), @@ -256,11 +344,6 @@ impl RelayGuards { reg_ip_burst, session_bw_rate_bytes, session_bw_burst_bytes, - redis, - player_global_window_secs, - player_global_limit, - reg_global_window_secs, - reg_global_limit, } } @@ -282,18 +365,7 @@ impl RelayGuards { let bucket = guard .entry(ip.to_string()) .or_insert_with(|| BucketState::new(burst, rate)); - let local_ok = bucket.reserve_delay(1).is_zero(); - drop(guard); - if !local_ok { - return false; - } - - let (window_secs, limit, scope) = if player { - (self.player_global_window_secs, self.player_global_limit, "mc") - } else { - (self.reg_global_window_secs, self.reg_global_limit, "reg") - }; - self.redis_allow_ip_window(ip, scope, window_secs, limit).await + bucket.reserve_delay(1).is_zero() } async fn throttle_session_bytes(&self, session_id: &str, dir: SessionDir, bytes: usize) { @@ -317,31 +389,231 @@ impl RelayGuards { self.session_ingress.lock().await.remove(session_id); self.session_egress.lock().await.remove(session_id); } +} - async fn redis_allow_ip_window( - &self, - ip: &str, - scope: &str, - window_secs: u64, - limit: i64, - ) -> bool { - let Some(mut conn) = self.redis.clone() else { +#[derive(Clone)] +struct RedisLimits { + conn: Option, + ttl_secs: u64, +} + +impl RedisLimits { + fn new(conn: Option, ttl_secs: u64) -> Self { + Self { conn, ttl_secs } + } + + async fn allow_stream_open(&self, session_id: &str, user_id: &str, max_session: u64, max_user: u64) -> bool { + let Some(mut conn) = self.conn.clone() else { return true; }; - let key = format!("ratelimit:ip:{ip}:{scope}"); - let res: redis::RedisResult = async { - let count: i64 = conn.incr(&key, 1).await?; - if count == 1 { - let _: bool = conn.expire(&key, window_secs as i64).await?; - } - Ok(count) - } - .await; + let ttl = self.ttl_secs as usize; + let session_key = format!("limit:streams:session:{session_id}"); + let user_key = format!("limit:streams:user:{user_id}"); + let max_session_i64 = if max_session == 0 { i64::MAX } else { max_session as i64 }; + let max_user_i64 = if max_user == 0 { i64::MAX } else { max_user as i64 }; + let script = redis::Script::new( + r#" +local session_key = KEYS[1] +local user_key = KEYS[2] +local max_session = tonumber(ARGV[1]) +local max_user = tonumber(ARGV[2]) +local ttl = tonumber(ARGV[3]) +local s = tonumber(redis.call('get', session_key) or '0') +local u = tonumber(redis.call('get', user_key) or '0') +if s + 1 > max_session then + return 0 +end +if u + 1 > max_user then + return 0 +end +redis.call('incr', session_key) +redis.call('expire', session_key, ttl) +redis.call('incr', user_key) +redis.call('expire', user_key, ttl) +return 1 +"#, + ); + let res: redis::RedisResult = script + .key(session_key) + .key(user_key) + .arg(max_session_i64) + .arg(max_user_i64) + .arg(ttl) + .invoke_async(&mut conn) + .await; match res { - Ok(count) => count <= limit, - Err(_) => true, + Ok(1) => true, + Ok(_) => false, + Err(e) => { + warn!(error = %e, "redis stream limit script failed; allowing"); + true + } } } + + async fn release_stream(&self, session_id: &str, user_id: &str) { + let Some(mut conn) = self.conn.clone() else { return; }; + let session_key = format!("limit:streams:session:{session_id}"); + let user_key = format!("limit:streams:user:{user_id}"); + let script = redis::Script::new( + r#" +local session_key = KEYS[1] +local user_key = KEYS[2] +local s = tonumber(redis.call('get', session_key) or '0') +if s > 0 then redis.call('decr', session_key) end +local u = tonumber(redis.call('get', user_key) or '0') +if u > 0 then redis.call('decr', user_key) end +return 1 +"#, + ); + let _: redis::RedisResult = script + .key(session_key) + .key(user_key) + .invoke_async(&mut conn) + .await; + } +} + +#[derive(Clone)] +struct QuicR2r { + server: QuinnEndpoint, + client: QuinnEndpoint, + server_name: String, +} + +#[derive(Debug)] +struct InsecureVerifier; + +impl ServerCertVerifier for InsecureVerifier { + fn verify_server_cert( + &self, + _end_entity: &CertificateDer<'_>, + _intermediates: &[CertificateDer<'_>], + _server_name: &rustls::pki_types::ServerName<'_>, + _ocsp_response: &[u8], + _now: rustls::pki_types::UnixTime, + ) -> Result { + Ok(ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> Result { + Ok(HandshakeSignatureValid::assertion()) + } + + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> Result { + Ok(HandshakeSignatureValid::assertion()) + } + + fn supported_verify_schemes(&self) -> Vec { + vec![ + SignatureScheme::ECDSA_NISTP256_SHA256, + SignatureScheme::RSA_PSS_SHA256, + SignatureScheme::RSA_PKCS1_SHA256, + ] + } +} + +fn load_cert_chain(path: &str) -> Result>> { + let file = File::open(path).context("open quic cert file")?; + let mut reader = BufReader::new(file); + let der_certs: Vec> = certs(&mut reader) + .collect::>>() + .context("read quic certs")?; + Ok(der_certs) +} + +fn load_private_key(path: &str) -> Result> { + let file = File::open(path).context("open quic key file")?; + let mut reader = BufReader::new(file); + let mut keys: Vec> = pkcs8_private_keys(&mut reader) + .map(|key| key.map(PrivateKeyDer::from)) + .collect::>>() + .context("read quic pkcs8 keys")?; + if let Some(key) = keys.pop() { + return Ok(key); + } + let file = File::open(path).context("open quic key file")?; + let mut reader = BufReader::new(file); + let mut keys: Vec> = rsa_private_keys(&mut reader) + .map(|key| key.map(PrivateKeyDer::from)) + .collect::>>() + .context("read quic rsa keys")?; + keys.pop().context("no quic private keys found") +} + +fn build_quic_server_config(cfg: &RelayConfig) -> Result { + let (cert_chain, key) = match (&cfg.r2r_quic_cert, &cfg.r2r_quic_key) { + (Some(cert_path), Some(key_path)) => (load_cert_chain(cert_path)?, load_private_key(key_path)?), + _ => { + let cert = generate_simple_self_signed(vec![cfg.r2r_quic_server_name.clone()]) + .context("generate quic self-signed cert")?; + let key = PrivateKeyDer::from(PrivatePkcs8KeyDer::from( + cert.serialize_private_key_der(), + )); + let cert = CertificateDer::from(cert.serialize_der().context("serialize quic cert")?); + (vec![cert], key) + } + }; + let server_config = QuinnServerConfig::with_single_cert(cert_chain, key) + .context("build quic server config")?; + Ok(server_config) +} + +fn build_quic_client_config(cfg: &RelayConfig) -> Result { + let mut root_store = RootCertStore::empty(); + if let Some(cert_path) = &cfg.r2r_quic_cert { + let (valid, _invalid) = root_store.add_parsable_certificates(load_cert_chain(cert_path)?); + if valid == 0 { + anyhow::bail!("no valid quic certs loaded from {}", cert_path); + } + } + + if cfg.r2r_quic_insecure { + let provider = rustls::crypto::ring::default_provider(); + let mut client_crypto = rustls::ClientConfig::builder_with_provider(provider.into()) + .with_protocol_versions(&[&rustls::version::TLS13]) + .context("configure quic client tls versions")? + .dangerous() + .with_custom_certificate_verifier(Arc::new(InsecureVerifier)) + .with_no_client_auth(); + client_crypto.enable_early_data = true; + let crypto = quinn::crypto::rustls::QuicClientConfig::try_from(client_crypto) + .context("build quic client config")?; + Ok(QuinnClientConfig::new(Arc::new(crypto))) + } else { + if root_store.is_empty() { + anyhow::bail!("quic client requires RELAY_R2R_QUIC_CERT or RELAY_R2R_QUIC_INSECURE=1"); + } + Ok(QuinnClientConfig::with_root_certificates(Arc::new(root_store)) + .context("build quic client config")?) + } +} + +fn build_quic_r2r(cfg: &RelayConfig) -> Result { + let server_config = build_quic_server_config(cfg)?; + let bind: std::net::SocketAddr = cfg.r2r_bind.parse().context("parse quic r2r bind")?; + let server = QuinnEndpoint::server(server_config, bind).context("bind quic r2r")?; + + let mut client = QuinnEndpoint::client("0.0.0.0:0".parse().context("parse quic client bind")?) + .context("bind quic client")?; + let client_config = build_quic_client_config(cfg)?; + client.set_default_client_config(client_config); + + Ok(QuicR2r { + server, + client, + server_name: cfg.r2r_quic_server_name.clone(), + }) } #[derive(Debug, Clone, Deserialize)] @@ -517,6 +789,10 @@ impl RedisRegistry { let raw: Option = conn.get(format!("relay:instance:{instance_id}")).await.ok()?; serde_json::from_str(&raw?).ok() } + + fn limits(&self, ttl_secs: u64) -> RedisLimits { + RedisLimits::new(self.conn.clone(), ttl_secs) + } } #[tokio::main] @@ -531,8 +807,8 @@ async fn main() -> Result<()> { let cfg = RelayConfig::from_env(); let registry = RedisRegistry::from_env(&cfg).await; - let guards = Arc::new(RelayGuards::from_env().await); - let r2r = Arc::new(R2rManager::new()); + let limits = Arc::new(registry.limits(cfg.limit_ttl_secs)); + let guards = Arc::new(RelayGuards::from_env()); registry.register_instance().await; let control_listener = TcpListener::bind(&cfg.control_bind) @@ -541,11 +817,22 @@ async fn main() -> Result<()> { let player_listener = TcpListener::bind(&cfg.player_bind) .await .with_context(|| format!("bind player {}", cfg.player_bind))?; - let r2r_listener = TcpListener::bind(&cfg.r2r_bind) - .await - .with_context(|| format!("bind r2r {}", cfg.r2r_bind))?; + let r2r_listener = if cfg.r2r_transport == "tcp" { + Some( + TcpListener::bind(&cfg.r2r_bind) + .await + .with_context(|| format!("bind r2r {}", cfg.r2r_bind))?, + ) + } else { + None + }; + let quic = if cfg.r2r_transport == "quic" { + Some(Arc::new(build_quic_r2r(&cfg)?)) + } else { + None + }; - info!(instance_id = %cfg.instance_id, region = %cfg.region, control = %cfg.control_bind, player = %cfg.player_bind, r2r = %cfg.r2r_bind, r2r_advertise = %cfg.r2r_advertise_addr, "relay started"); + info!(instance_id = %cfg.instance_id, region = %cfg.region, control = %cfg.control_bind, player = %cfg.player_bind, r2r = %cfg.r2r_bind, r2r_advertise = %cfg.r2r_advertise_addr, r2r_transport = %cfg.r2r_transport, "relay started"); metrics::gauge!("relay_drain_state").set(0.0); let shutdown = Arc::new(Notify::new()); @@ -566,17 +853,31 @@ async fn main() -> Result<()> { state.clone(), registry.clone(), guards.clone(), - r2r.clone(), - shutdown.clone(), - )); - let r2r_task = tokio::spawn(run_r2r_accept_loop( - r2r_listener, - cfg.clone(), - state.clone(), - guards.clone(), - r2r.clone(), + limits.clone(), + quic.clone(), shutdown.clone(), )); + let r2r_task = if let Some(listener) = r2r_listener { + tokio::spawn(run_r2r_accept_loop( + listener, + cfg.clone(), + state.clone(), + guards.clone(), + limits.clone(), + shutdown.clone(), + )) + } else if let Some(quic) = quic.clone() { + tokio::spawn(run_quic_r2r_accept_loop( + quic, + cfg.clone(), + state.clone(), + guards.clone(), + limits.clone(), + shutdown.clone(), + )) + } else { + tokio::spawn(async { Ok::<(), anyhow::Error>(()) }) + }; tokio::pin!(heartbeat_task); tokio::pin!(control_task); @@ -652,7 +953,8 @@ async fn run_player_accept_loop( state: SharedState, registry: RedisRegistry, guards: Arc, - r2r: Arc, + limits: Arc, + quic: Option>, shutdown: Arc, ) -> Result<()> { loop { @@ -668,9 +970,10 @@ async fn run_player_accept_loop( let state = state.clone(); let registry = registry.clone(); let guards = guards.clone(); - let r2r = r2r.clone(); + let limits = limits.clone(); + let quic = quic.clone(); tokio::spawn(async move { - if let Err(e) = handle_player_conn(stream, addr, cfg, state, registry, guards, r2r).await { + if let Err(e) = handle_player_conn(stream, addr, cfg, state, registry, guards, limits, quic).await { debug!(peer = %addr, error = %e, "player connection closed"); } }); @@ -685,7 +988,7 @@ async fn run_r2r_accept_loop( cfg: RelayConfig, state: SharedState, guards: Arc, - r2r: Arc, + limits: Arc, shutdown: Arc, ) -> Result<()> { loop { @@ -700,9 +1003,9 @@ async fn run_r2r_accept_loop( let cfg = cfg.clone(); let state = state.clone(); let guards = guards.clone(); - let r2r = r2r.clone(); + let limits = limits.clone(); tokio::spawn(async move { - if let Err(e) = handle_r2r_conn(stream, addr, cfg, state, guards, r2r).await { + if let Err(e) = handle_r2r_conn(stream, addr, cfg, state, guards, limits).await { warn!(peer = %addr, error = %e, "r2r connection ended with error"); } }); @@ -712,6 +1015,49 @@ async fn run_r2r_accept_loop( Ok(()) } +async fn run_quic_r2r_accept_loop( + quic: Arc, + cfg: RelayConfig, + state: SharedState, + guards: Arc, + limits: Arc, + shutdown: Arc, +) -> Result<()> { + loop { + tokio::select! { + _ = shutdown.notified() => break, + Some(connecting) = quic.server.accept() => { + let cfg = cfg.clone(); + let state = state.clone(); + let guards = guards.clone(); + let limits = limits.clone(); + tokio::spawn(async move { + let connection = match connecting.await { + Ok(conn) => conn, + Err(e) => { warn!(error = %e, "quic r2r connect failed"); return; } + }; + loop { + let Ok((send, recv)) = connection.accept_bi().await else { + break; + }; + let addr = connection.remote_address(); + let cfg = cfg.clone(); + let state = state.clone(); + let guards = guards.clone(); + let limits = limits.clone(); + tokio::spawn(async move { + if let Err(e) = handle_r2r_quic_conn(send, recv, addr, cfg, state, guards, limits).await { + warn!(peer = %addr, error = %e, "quic r2r stream ended with error"); + } + }); + } + }); + } + } + } + Ok(()) +} + #[tracing::instrument(skip(stream, state, registry, guards, cfg), fields(peer = %addr))] async fn handle_control_conn( stream: TcpStream, @@ -740,11 +1086,12 @@ async fn handle_control_conn( anyhow::bail!("invalid token"); } - let (tx, mut rx) = mpsc::channel::(512); + let (tx, mut rx) = ControlSender::new(cfg.control_queue_depth, cfg.control_queue_policy); let session_id = Uuid::new_v4().to_string(); let fqdn = assign_fqdn(&cfg, ®ister); let user_id = fake_user_id_from_token(®ister.token); let stream_sinks = Arc::new(RwLock::new(HashMap::>>::new())); + let stream_priorities = Arc::new(RwLock::new(HashMap::::new())); { let mut guard = state.write().await; @@ -755,6 +1102,8 @@ async fn handle_control_conn( session_id: session_id.clone(), tx: tx.clone(), stream_sinks: stream_sinks.clone(), + stream_priorities: stream_priorities.clone(), + user_id: user_id.clone(), last_heartbeat: Instant::now(), }, ); @@ -770,12 +1119,7 @@ async fn handle_control_conn( info!(peer = %addr, user_id = %user_id, fqdn = %fqdn, session_id = %session_id, "client registered"); metrics::counter!("relay_tunnel_registrations_total").increment(1); - let write_task = tokio::spawn(async move { - while let Some(frame) = rx.recv().await { - write_frame(&mut writer, &frame).await?; - } - Ok::<(), anyhow::Error>(()) - }); + let write_task = tokio::spawn(async move { run_control_writer(&mut writer, &mut rx).await }); let read_result = control_read_loop( &mut reader, @@ -785,6 +1129,7 @@ async fn handle_control_conn( &session_id, &fqdn, &user_id, + cfg.stream_queue_policy, cfg.heartbeat_timeout, ).await; @@ -813,6 +1158,7 @@ async fn control_read_loop( session_id: &str, fqdn: &str, user_id: &str, + stream_queue_policy: QueuePolicy, heartbeat_timeout: Duration, ) -> Result<()> { loop { @@ -837,7 +1183,10 @@ async fn control_read_loop( .await; let sink = lookup_stream_sink(state, session_id, &stream_id).await; if let Some(tx) = sink { - if tx.send(data).await.is_err() { + if !send_stream_chunk(&tx, data, stream_queue_policy).await { + metrics::counter!("relay_queue_drops_total", "queue" => "stream_ingress").increment(1); + } + if tx.is_closed() { remove_stream_sink(state, session_id, &stream_id).await; } } @@ -851,7 +1200,33 @@ async fn control_read_loop( } } -#[tracing::instrument(skip(stream, cfg, state, registry, guards, r2r), fields(peer = %addr))] +async fn run_control_writer( + writer: &mut tokio::net::tcp::OwnedWriteHalf, + rx: &mut ControlReceiver, +) -> Result<()> { + loop { + tokio::select! { + biased; + Some(frame) = rx.high_rx.recv() => { + write_frame(writer, &frame).await?; + } + Some(frame) = rx.low_rx.recv() => { + write_frame(writer, &frame).await?; + } + else => break, + } + } + Ok(()) +} + +async fn send_stream_chunk(tx: &mpsc::Sender>, data: Vec, policy: QueuePolicy) -> bool { + match policy { + QueuePolicy::Drop => tx.try_send(data).is_ok(), + QueuePolicy::Deny => tx.send(data).await.is_ok(), + } +} + +#[tracing::instrument(skip(stream, cfg, state, registry, guards, limits, quic), fields(peer = %addr))] async fn handle_player_conn( mut stream: TcpStream, addr: SocketAddr, @@ -859,7 +1234,8 @@ async fn handle_player_conn( state: SharedState, registry: RedisRegistry, guards: Arc, - r2r: Arc, + limits: Arc, + quic: Option>, ) -> Result<()> { if !guards.allow_player_ip(&addr.ip().to_string()).await { metrics::counter!("relay_rate_limited_total", "scope" => "player_ip").increment(1); @@ -881,6 +1257,8 @@ async fn handle_player_conn( None, "direct", guards, + limits, + &cfg, ) .await; } @@ -890,38 +1268,101 @@ async fn handle_player_conn( debug!(peer = %addr, hostname = %hostname, session_id = %route.session_id, "route points to self but local session missing"); return Ok(()); } - return proxy_player_to_owner(stream, addr, hostname, initial_data, route, cfg, registry, guards, r2r).await; + return proxy_player_to_owner(stream, addr, hostname, initial_data, route, cfg, registry, quic).await; } debug!(peer = %addr, hostname = %hostname, "no tunnel for hostname"); Ok(()) } -#[tracing::instrument(skip(stream, cfg, state, guards, r2r), fields(peer = %addr))] +#[tracing::instrument(skip(stream, cfg, state, guards, limits), fields(peer = %addr))] async fn handle_r2r_conn( - stream: TcpStream, + mut stream: TcpStream, addr: SocketAddr, cfg: RelayConfig, state: SharedState, guards: Arc, - r2r: Arc, + limits: Arc, ) -> Result<()> { - handle_r2r_multiplex_conn(stream, addr, cfg, state, guards, r2r) - .await - .with_context(|| format!("r2r multiplex failed from {addr}")) + let prelude: RelayForwardPrelude = read_frame(&mut stream).await.context("read r2r prelude")?; + if prelude.version != 1 { + anyhow::bail!("unsupported r2r prelude version {}", prelude.version); + } + if prelude.hop_count > 1 { + anyhow::bail!("invalid hop_count {}", prelude.hop_count); + } + + let session = local_session_for_session_id(&state, &prelude.session_id).await; + let Some(session) = session else { + anyhow::bail!("owner session not found for {}", prelude.session_id); + }; + + attach_player_socket_to_session( + stream, + session, + prelude.fqdn.clone(), + prelude.peer_addr, + prelude.initial_data, + Some(prelude.stream_id), + "r2r", + guards, + limits, + &cfg, + ) + .await + .with_context(|| format!("r2r attach failed from {addr}")) } -#[tracing::instrument(skip(player_stream, route, cfg, registry, guards, r2r), fields(peer = %player_addr, hostname = %hostname))] +#[tracing::instrument(skip(send, recv, cfg, state, guards, limits), fields(peer = %addr))] +async fn handle_r2r_quic_conn( + send: quinn::SendStream, + mut recv: quinn::RecvStream, + addr: SocketAddr, + cfg: RelayConfig, + state: SharedState, + guards: Arc, + limits: Arc, +) -> Result<()> { + let prelude: RelayForwardPrelude = read_frame(&mut recv).await.context("read quic r2r prelude")?; + if prelude.version != 1 { + anyhow::bail!("unsupported r2r prelude version {}", prelude.version); + } + if prelude.hop_count > 1 { + anyhow::bail!("invalid hop_count {}", prelude.hop_count); + } + + let session = local_session_for_session_id(&state, &prelude.session_id).await; + let Some(session) = session else { + anyhow::bail!("owner session not found for {}", prelude.session_id); + }; + + attach_r2r_stream_to_session( + recv, + send, + session, + prelude.fqdn.clone(), + prelude.peer_addr, + prelude.initial_data, + Some(prelude.stream_id), + "r2r", + guards, + limits, + &cfg, + ) + .await + .with_context(|| format!("quic r2r attach failed from {addr}")) +} + +#[tracing::instrument(skip(player_stream, route, cfg, registry, quic), fields(peer = %player_addr, hostname = %hostname))] async fn proxy_player_to_owner( - player_stream: TcpStream, + mut player_stream: TcpStream, player_addr: SocketAddr, hostname: String, initial_data: Vec, route: TunnelRouteRecord, cfg: RelayConfig, registry: RedisRegistry, - guards: Arc, - r2r: Arc, + quic: Option>, ) -> Result<()> { let redis_lookup_started = Instant::now(); let owner = registry @@ -936,326 +1377,53 @@ async fn proxy_player_to_owner( .with_context(|| format!("owner {} missing r2r_addr", route.instance_id))?; let r2r_connect_started = Instant::now(); - metrics::histogram!("relay_r2r_connect_latency_ms") - .record(r2r_connect_started.elapsed().as_secs_f64() * 1000.0); - let prelude = RelayForwardPrelude { version: 1, - session_id: route.session_id.clone(), + session_id: route.session_id, fqdn: hostname.clone(), stream_id: Uuid::new_v4().to_string(), peer_addr: player_addr.to_string(), - origin_instance_id: cfg.instance_id.clone(), + origin_instance_id: cfg.instance_id, hop_count: 1, initial_data, }; - proxy_player_to_owner_pooled( - player_stream, - player_addr, - hostname, - route.instance_id, - r2r_addr, - prelude, - route.session_id, - cfg, - guards, - r2r, - ) - .await -} + if cfg.r2r_transport == "quic" { + let quic = quic.context("quic transport requested but not configured")?; + let addr: std::net::SocketAddr = r2r_addr.parse().context("parse r2r quic addr")?; + let connecting = quic + .client + .connect(addr, &quic.server_name) + .context("quic connect")?; + let connection = timeout(cfg.r2r_connect_timeout, connecting) + .await + .context("r2r quic connect timeout")??; + metrics::histogram!("relay_r2r_connect_latency_ms") + .record(r2r_connect_started.elapsed().as_secs_f64() * 1000.0); + let (mut send, mut recv) = connection.open_bi().await.context("open quic stream")?; + write_frame(&mut send, &prelude).await?; -async fn proxy_player_to_owner_pooled( - player_stream: TcpStream, - player_addr: SocketAddr, - hostname: String, - owner_instance_id: String, - owner_r2r_addr: String, - prelude: RelayForwardPrelude, - session_id: String, - cfg: RelayConfig, - guards: Arc, - r2r: Arc, -) -> Result<()> { - let stream_id = prelude.stream_id.clone(); - let sender = get_or_connect_r2r_pool( - owner_instance_id.clone(), - owner_r2r_addr, - cfg, - guards, - r2r.clone(), - ) - .await?; - - let (player_read, player_write) = player_stream.into_split(); - let (to_player_tx, to_player_rx) = mpsc::channel::>(128); - r2r.ingress_stream_sinks - .write() - .await - .insert(stream_id.clone(), to_player_tx); - - sender - .send(R2rFrame::Open(prelude)) - .await - .context("send r2r open")?; - - let tx = sender.clone(); - let sid = session_id.clone(); - let stid = stream_id.clone(); - let sinks = r2r.ingress_stream_sinks.clone(); - tokio::spawn(async move { - if let Err(e) = - run_ingress_player_reader_to_r2r(player_read, tx.clone(), sid.clone(), stid.clone()).await - { - debug!(stream_id = %stid, error = %e, "ingress player->r2r reader ended"); - } - let _ = tx - .send(R2rFrame::Close(R2rStreamClosed { - session_id: sid, - stream_id: stid.clone(), - reason: Some("ingress_player_reader_closed".into()), - })) - .await; - let _ = sinks.write().await.remove(&stid); - }); - - let stid = stream_id.clone(); - let sinks = r2r.ingress_stream_sinks.clone(); - tokio::spawn(async move { - if let Err(e) = run_ingress_player_writer(player_write, to_player_rx).await { - debug!(stream_id = %stid, error = %e, "ingress r2r->player writer ended"); - } - let _ = sinks.write().await.remove(&stid); - }); + let (mut player_read, mut player_write) = player_stream.into_split(); + let uplink = tokio::spawn(async move { + let _ = copy(&mut player_read, &mut send).await; + let _ = send.finish(); + }); + let downlink = tokio::spawn(async move { + let _ = copy(&mut recv, &mut player_write).await; + }); + let _ = uplink.await; + let _ = downlink.await; + } else { + let mut owner_stream = timeout(cfg.r2r_connect_timeout, TcpStream::connect(&r2r_addr)) + .await + .context("r2r connect timeout")??; + metrics::histogram!("relay_r2r_connect_latency_ms") + .record(r2r_connect_started.elapsed().as_secs_f64() * 1000.0); + write_frame(&mut owner_stream, &prelude).await?; + let _ = copy_bidirectional(&mut player_stream, &mut owner_stream).await?; + } metrics::counter!("relay_r2r_forwards_total").increment(1); - info!(peer = %player_addr, hostname = %hostname, owner = %owner_instance_id, stream_id = %stream_id, "proxied player via pooled r2r channel"); - Ok(()) -} - -async fn get_or_connect_r2r_pool( - owner_instance_id: String, - owner_r2r_addr: String, - cfg: RelayConfig, - guards: Arc, - r2r: Arc, -) -> Result> { - if let Some(existing) = r2r.outbound.lock().await.get(&owner_instance_id).cloned() { - return Ok(existing); - } - - let connect_started = Instant::now(); - let stream = timeout(cfg.r2r_connect_timeout, TcpStream::connect(&owner_r2r_addr)) - .await - .context("r2r connect timeout")??; - metrics::histogram!("relay_r2r_connect_latency_ms") - .record(connect_started.elapsed().as_secs_f64() * 1000.0); - - let (mut reader, mut writer) = stream.into_split(); - let (tx, mut rx) = mpsc::channel::(2048); - - let mut pools = r2r.outbound.lock().await; - if let Some(existing) = pools.get(&owner_instance_id).cloned() { - return Ok(existing); - } - pools.insert(owner_instance_id.clone(), tx.clone()); - drop(pools); - - let owner_for_reader = owner_instance_id.clone(); - let r2r_for_reader = r2r.clone(); - let guards_for_reader = guards.clone(); - tokio::spawn(async move { - loop { - match read_frame::<_, R2rFrame>(&mut reader).await { - Ok(frame) => { - if let Err(e) = handle_r2r_inbound_frame(frame, &r2r_for_reader, &guards_for_reader).await { - debug!(owner = %owner_for_reader, error = %e, "r2r pooled inbound frame error"); - break; - } - } - Err(e) => { - debug!(owner = %owner_for_reader, error = %e, "r2r pooled reader ended"); - break; - } - } - } - r2r_for_reader.outbound.lock().await.remove(&owner_for_reader); - }); - - tokio::spawn(async move { - while let Some(frame) = rx.recv().await { - if let Err(e) = write_frame(&mut writer, &frame).await { - debug!(error = %e, "r2r pooled writer ended"); - break; - } - } - }); - - Ok(tx) -} - -async fn handle_r2r_multiplex_conn( - stream: TcpStream, - _addr: SocketAddr, - _cfg: RelayConfig, - state: SharedState, - guards: Arc, - _r2r: Arc, -) -> Result<()> { - let (mut reader, mut writer) = stream.into_split(); - let (tx, mut rx) = mpsc::channel::(2048); - - let _writer_task = tokio::spawn(async move { - while let Some(frame) = rx.recv().await { - write_frame(&mut writer, &frame).await?; - } - Ok::<(), anyhow::Error>(()) - }); - - loop { - let frame: R2rFrame = read_frame(&mut reader).await?; - match frame { - R2rFrame::Open(prelude) => { - if prelude.version != 1 || prelude.hop_count > 1 { - continue; - } - if let Some(session) = local_session_for_session_id(&state, &prelude.session_id).await { - attach_virtual_r2r_stream_to_session(session, prelude, tx.clone()).await?; - } else { - let _ = tx.send(R2rFrame::Close(R2rStreamClosed { - session_id: prelude.session_id, - stream_id: prelude.stream_id, - reason: Some("owner_session_not_found".into()), - })).await; - } - } - R2rFrame::Data(data) => { - guards - .throttle_session_bytes(&data.session_id, SessionDir::EgressToClient, data.data.len()) - .await; - if let Some(session) = local_session_for_session_id(&state, &data.session_id).await { - let _ = session - .tx - .send(ServerFrame::StreamData(StreamData { stream_id: data.stream_id, data: data.data })) - .await; - } - } - R2rFrame::Close(close) => { - if let Some(session) = local_session_for_session_id(&state, &close.session_id).await { - let _ = session - .tx - .send(ServerFrame::StreamClosed(StreamClosed { stream_id: close.stream_id.clone(), reason: close.reason.clone() })) - .await; - remove_stream_sink(&state, &close.session_id, &close.stream_id).await; - } else { - remove_stream_sink(&state, &close.session_id, &close.stream_id).await; - } - } - R2rFrame::Ping => { - let _ = tx.send(R2rFrame::Pong).await; - } - R2rFrame::Pong => {} - } - } -} - -async fn attach_virtual_r2r_stream_to_session( - session: SessionHandle, - prelude: RelayForwardPrelude, - r2r_tx: mpsc::Sender, -) -> Result<()> { - let stream_id = prelude.stream_id.clone(); - let session_id = session.session_id.clone(); - let (to_r2r_tx, mut to_r2r_rx) = mpsc::channel::>(128); - session - .stream_sinks - .write() - .await - .insert(stream_id.clone(), to_r2r_tx); - - session - .tx - .send(ServerFrame::IncomingTcp(IncomingTcp { - stream_id: stream_id.clone(), - session_id: session_id.clone(), - peer_addr: prelude.peer_addr.clone(), - hostname: prelude.fqdn.clone(), - initial_data: prelude.initial_data.clone(), - })) - .await - .context("send virtual r2r IncomingTcp to client")?; - - tokio::spawn(async move { - while let Some(chunk) = to_r2r_rx.recv().await { - let _ = r2r_tx - .send(R2rFrame::Data(R2rStreamData { - session_id: session_id.clone(), - stream_id: stream_id.clone(), - data: chunk, - })) - .await; - } - let _ = r2r_tx - .send(R2rFrame::Close(R2rStreamClosed { - session_id, - stream_id, - reason: Some("owner_sink_closed".into()), - })) - .await; - }); - - Ok(()) -} - -async fn handle_r2r_inbound_frame( - frame: R2rFrame, - r2r: &R2rManager, - _guards: &RelayGuards, -) -> Result<()> { - match frame { - R2rFrame::Data(data) => { - if let Some(tx) = r2r.ingress_stream_sinks.read().await.get(&data.stream_id).cloned() { - let _ = tx.send(data.data).await; - } - } - R2rFrame::Close(close) => { - r2r.ingress_stream_sinks.write().await.remove(&close.stream_id); - } - R2rFrame::Ping | R2rFrame::Pong | R2rFrame::Open(_) => {} - } - Ok(()) -} - -async fn run_ingress_player_reader_to_r2r( - mut reader: tokio::net::tcp::OwnedReadHalf, - tx: mpsc::Sender, - session_id: String, - stream_id: String, -) -> Result<()> { - let mut buf = vec![0u8; 16 * 1024]; - loop { - let n = reader.read(&mut buf).await?; - if n == 0 { - break; - } - tx.send(R2rFrame::Data(R2rStreamData { - session_id: session_id.clone(), - stream_id: stream_id.clone(), - data: buf[..n].to_vec(), - })) - .await - .context("send ingress data to r2r")?; - } - Ok(()) -} - -async fn run_ingress_player_writer( - mut writer: tokio::net::tcp::OwnedWriteHalf, - mut rx: mpsc::Receiver>, -) -> Result<()> { - while let Some(chunk) = rx.recv().await { - writer.write_all(&chunk).await?; - } - let _ = writer.shutdown().await; + info!(peer = %player_addr, hostname = %hostname, owner = %route.instance_id, transport = %cfg.r2r_transport, "proxied player connection to owner relay"); Ok(()) } @@ -1278,51 +1446,154 @@ async fn attach_player_socket_to_session( stream_id_override: Option, source: &'static str, guards: Arc, + limits: Arc, + cfg: &RelayConfig, ) -> Result<()> { - let stream_id = stream_id_override.unwrap_or_else(|| Uuid::new_v4().to_string()); let (player_read, player_write) = stream.into_split(); - let (to_player_tx, to_player_rx) = mpsc::channel::>(128); - session.stream_sinks.write().await.insert(stream_id.clone(), to_player_tx); + attach_io_to_session( + player_read, + player_write, + session, + hostname, + peer_addr, + initial_data, + stream_id_override, + source, + guards, + limits, + cfg, + ) + .await +} - session +async fn attach_r2r_stream_to_session( + recv: quinn::RecvStream, + send: quinn::SendStream, + session: SessionHandle, + hostname: String, + peer_addr: String, + initial_data: Vec, + stream_id_override: Option, + source: &'static str, + guards: Arc, + limits: Arc, + cfg: &RelayConfig, +) -> Result<()> { + attach_io_to_session( + recv, + send, + session, + hostname, + peer_addr, + initial_data, + stream_id_override, + source, + guards, + limits, + cfg, + ) + .await +} + +async fn attach_io_to_session( + reader: R, + writer: W, + session: SessionHandle, + hostname: String, + peer_addr: String, + initial_data: Vec, + stream_id_override: Option, + source: &'static str, + guards: Arc, + limits: Arc, + cfg: &RelayConfig, +) -> Result<()> +where + R: tokio::io::AsyncRead + Unpin + Send + 'static, + W: tokio::io::AsyncWrite + Unpin + Send + 'static, +{ + if !limits + .allow_stream_open( + &session.session_id, + &session.user_id, + cfg.max_streams_per_session, + cfg.max_streams_per_user, + ) + .await + { + metrics::counter!("relay_stream_limit_denied_total").increment(1); + debug!(session_id = %session.session_id, user_id = %session.user_id, "stream limit denied"); + return Ok(()); + } + + let stream_id = stream_id_override.unwrap_or_else(|| Uuid::new_v4().to_string()); + let user_id = session.user_id.clone(); + let (to_player_tx, to_player_rx) = mpsc::channel::>(cfg.stream_queue_depth.max(1)); + let priority = if source == "r2r" { StreamPriority::High } else { StreamPriority::Normal }; + session.stream_sinks.write().await.insert(stream_id.clone(), to_player_tx); + session.stream_priorities.write().await.insert(stream_id.clone(), priority); + + let accepted = session .tx - .send(ServerFrame::IncomingTcp(IncomingTcp { + .send_high(ServerFrame::IncomingTcp(IncomingTcp { stream_id: stream_id.clone(), session_id: session.session_id.clone(), peer_addr: peer_addr.clone(), hostname: hostname.clone(), initial_data, })) - .await - .context("send IncomingTcp to client")?; + .await; + if !accepted { + metrics::counter!("relay_queue_drops_total", "queue" => "control_high").increment(1); + let _ = remove_stream_entry_by_store(session.stream_sinks.clone(), session.stream_priorities.clone(), &stream_id).await; + limits.release_stream(&session.session_id, &session.user_id).await; + return Ok(()); + } + + let release_once = Arc::new(AtomicBool::new(false)); let tx_control = session.tx.clone(); let stream_id_clone = stream_id.clone(); + let session_id_clone = session.session_id.clone(); let sinks = session.stream_sinks.clone(); + let priorities = session.stream_priorities.clone(); + let limits_clone = limits.clone(); + let release_once_clone = release_once.clone(); + let user_id_writer = user_id.clone(); tokio::spawn(async move { - if let Err(e) = run_player_writer(player_write, to_player_rx).await { + if let Err(e) = run_stream_writer(writer, to_player_rx).await { debug!(stream_id = %stream_id_clone, error = %e, "player writer ended"); } let _ = tx_control - .send(ServerFrame::StreamClosed(StreamClosed { + .send_high(ServerFrame::StreamClosed(StreamClosed { stream_id: stream_id_clone.clone(), reason: Some("player_writer_closed".into()), })) .await; - let _ = remove_stream_sink_by_store(sinks, &stream_id_clone).await; + let _ = remove_stream_entry_by_store(sinks, priorities, &stream_id_clone).await; + if !release_once_clone.swap(true, Ordering::SeqCst) { + limits_clone + .release_stream(&session_id_clone, &user_id_writer) + .await; + } }); let tx_control = session.tx.clone(); let stream_id_clone = stream_id.clone(); let session_id_clone = session.session_id.clone(); let sinks = session.stream_sinks.clone(); + let priorities = session.stream_priorities.clone(); let guards_clone = guards.clone(); + let limits_clone = limits.clone(); + let release_once_clone = release_once.clone(); + let user_id_reader = user_id.clone(); tokio::spawn(async move { - if let Err(e) = run_player_reader( - player_read, + if let Err(e) = run_stream_reader( + reader, tx_control.clone(), stream_id_clone.clone(), - session_id_clone, + session_id_clone.clone(), + priority, guards_clone, ) .await @@ -1330,12 +1601,17 @@ async fn attach_player_socket_to_session( debug!(stream_id = %stream_id_clone, error = %e, "player reader ended"); } let _ = tx_control - .send(ServerFrame::StreamClosed(StreamClosed { + .send_high(ServerFrame::StreamClosed(StreamClosed { stream_id: stream_id_clone.clone(), reason: Some("player_reader_closed".into()), })) .await; - let _ = remove_stream_sink_by_store(sinks, &stream_id_clone).await; + let _ = remove_stream_entry_by_store(sinks, priorities, &stream_id_clone).await; + if !release_once_clone.swap(true, Ordering::SeqCst) { + limits_clone + .release_stream(&session_id_clone, &user_id_reader) + .await; + } }); info!(peer = %peer_addr, hostname = %hostname, session_id = %session.session_id, stream_id = %stream_id, source, "player proxied via client stream"); @@ -1343,13 +1619,17 @@ async fn attach_player_socket_to_session( Ok(()) } -async fn run_player_reader( - mut reader: tokio::net::tcp::OwnedReadHalf, - tx_control: mpsc::Sender, +async fn run_stream_reader( + mut reader: R, + tx_control: ControlSender, stream_id: String, session_id: String, + priority: StreamPriority, guards: Arc, -) -> Result<()> { +) -> Result<()> +where + R: tokio::io::AsyncRead + Unpin, +{ let mut buf = vec![0u8; 16 * 1024]; loop { let n = reader.read(&mut buf).await?; @@ -1359,23 +1639,27 @@ async fn run_player_reader( guards .throttle_session_bytes(&session_id, SessionDir::EgressToClient, n) .await; - tx_control - .send(ServerFrame::StreamData(StreamData { - stream_id: stream_id.clone(), - data: buf[..n].to_vec(), - })) - .await - .context("send stream data to client")?; + let frame = ServerFrame::StreamData(StreamData { + stream_id: stream_id.clone(), + data: buf[..n].to_vec(), + }); + let sent = match priority { + StreamPriority::High => tx_control.send_high(frame).await, + StreamPriority::Normal => tx_control.send_low(frame).await, + }; + if !sent { + metrics::counter!("relay_queue_drops_total", "queue" => "control_data").increment(1); + } metrics::counter!("relay_bytes_out_total").increment(n as u64); } metrics::gauge!("relay_active_player_conns").decrement(1.0); Ok(()) } -async fn run_player_writer( - mut writer: tokio::net::tcp::OwnedWriteHalf, - mut rx: mpsc::Receiver>, -) -> Result<()> { +async fn run_stream_writer(mut writer: W, mut rx: mpsc::Receiver>) -> Result<()> +where + W: tokio::io::AsyncWrite + Unpin, +{ while let Some(chunk) = rx.recv().await { writer.write_all(&chunk).await?; metrics::counter!("relay_bytes_in_total").increment(chunk.len() as u64); @@ -1398,19 +1682,21 @@ async fn lookup_stream_sink( } async fn remove_stream_sink(state: &SharedState, session_id: &str, stream_id: &str) { - let store = { + let (store, priorities) = { let guard = state.read().await; let Some(fqdn) = guard.by_session.get(session_id).cloned() else { return; }; let Some(handle) = guard.by_fqdn.get(&fqdn) else { return; }; - handle.stream_sinks.clone() + (handle.stream_sinks.clone(), handle.stream_priorities.clone()) }; - let _ = remove_stream_sink_by_store(store, stream_id).await; + let _ = remove_stream_entry_by_store(store, priorities, stream_id).await; } -async fn remove_stream_sink_by_store( +async fn remove_stream_entry_by_store( store: Arc>>>>, + priorities: Arc>>, stream_id: &str, ) -> Option>> { + priorities.write().await.remove(stream_id); store.write().await.remove(stream_id) }