From 28918880dafc7d216149dc58dc56f1965bff1619 Mon Sep 17 00:00:00 2001 From: L Date: Tue, 24 Feb 2026 08:25:28 +0000 Subject: [PATCH] feat: add hybrid local and redis rate limiting enforcement --- relay/src/main.rs | 75 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 72 insertions(+), 3 deletions(-) diff --git a/relay/src/main.rs b/relay/src/main.rs index 46d86b7..29f814a 100644 --- a/relay/src/main.rs +++ b/relay/src/main.rs @@ -134,6 +134,11 @@ struct RelayGuards { reg_ip_burst: f64, session_bw_rate_bytes: f64, session_bw_burst_bytes: f64, + redis: Option, + player_global_window_secs: u64, + player_global_limit: i64, + reg_global_window_secs: u64, + reg_global_limit: i64, } #[derive(Debug, Clone)] @@ -186,7 +191,7 @@ impl BucketState { } impl RelayGuards { - fn from_env() -> Self { + async fn from_env() -> Self { let player_ip_rate = std::env::var("RELAY_PLAYER_CONNECTS_PER_SEC") .ok() .and_then(|v| v.parse().ok()) @@ -216,6 +221,29 @@ impl RelayGuards { .and_then(|v| v.parse().ok()) .unwrap_or(512.0) * 1024.0; + let redis = match std::env::var("REDIS_URL") { + Ok(url) => match redis::Client::open(url) { + Ok(client) => redis::aio::ConnectionManager::new(client).await.ok(), + Err(_) => None, + }, + Err(_) => None, + }; + let player_global_window_secs = std::env::var("RELAY_PLAYER_GLOBAL_WINDOW_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(10); + let player_global_limit = std::env::var("RELAY_PLAYER_GLOBAL_LIMIT") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(50); + let reg_global_window_secs = std::env::var("RELAY_REG_GLOBAL_WINDOW_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(60); + let reg_global_limit = std::env::var("RELAY_REG_GLOBAL_LIMIT") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(10); Self { player_ip: Arc::new(Mutex::new(HashMap::new())), @@ -228,6 +256,11 @@ impl RelayGuards { reg_ip_burst, session_bw_rate_bytes, session_bw_burst_bytes, + redis, + player_global_window_secs, + player_global_limit, + reg_global_window_secs, + reg_global_limit, } } @@ -249,7 +282,18 @@ impl RelayGuards { let bucket = guard .entry(ip.to_string()) .or_insert_with(|| BucketState::new(burst, rate)); - bucket.reserve_delay(1).is_zero() + let local_ok = bucket.reserve_delay(1).is_zero(); + drop(guard); + if !local_ok { + return false; + } + + let (window_secs, limit, scope) = if player { + (self.player_global_window_secs, self.player_global_limit, "mc") + } else { + (self.reg_global_window_secs, self.reg_global_limit, "reg") + }; + self.redis_allow_ip_window(ip, scope, window_secs, limit).await } async fn throttle_session_bytes(&self, session_id: &str, dir: SessionDir, bytes: usize) { @@ -273,6 +317,31 @@ impl RelayGuards { self.session_ingress.lock().await.remove(session_id); self.session_egress.lock().await.remove(session_id); } + + async fn redis_allow_ip_window( + &self, + ip: &str, + scope: &str, + window_secs: u64, + limit: i64, + ) -> bool { + let Some(mut conn) = self.redis.clone() else { + return true; + }; + let key = format!("ratelimit:ip:{ip}:{scope}"); + let res: redis::RedisResult = async { + let count: i64 = conn.incr(&key, 1).await?; + if count == 1 { + let _: bool = conn.expire(&key, window_secs as i64).await?; + } + Ok(count) + } + .await; + match res { + Ok(count) => count <= limit, + Err(_) => true, + } + } } #[derive(Debug, Clone, Deserialize)] @@ -462,7 +531,7 @@ async fn main() -> Result<()> { let cfg = RelayConfig::from_env(); let registry = RedisRegistry::from_env(&cfg).await; - let guards = Arc::new(RelayGuards::from_env()); + let guards = Arc::new(RelayGuards::from_env().await); let r2r = Arc::new(R2rManager::new()); registry.register_instance().await;