feat: add hybrid local and redis rate limiting enforcement
This commit is contained in:
@@ -134,6 +134,11 @@ struct RelayGuards {
|
|||||||
reg_ip_burst: f64,
|
reg_ip_burst: f64,
|
||||||
session_bw_rate_bytes: f64,
|
session_bw_rate_bytes: f64,
|
||||||
session_bw_burst_bytes: f64,
|
session_bw_burst_bytes: f64,
|
||||||
|
redis: Option<redis::aio::ConnectionManager>,
|
||||||
|
player_global_window_secs: u64,
|
||||||
|
player_global_limit: i64,
|
||||||
|
reg_global_window_secs: u64,
|
||||||
|
reg_global_limit: i64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@@ -186,7 +191,7 @@ impl BucketState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RelayGuards {
|
impl RelayGuards {
|
||||||
fn from_env() -> Self {
|
async fn from_env() -> Self {
|
||||||
let player_ip_rate = std::env::var("RELAY_PLAYER_CONNECTS_PER_SEC")
|
let player_ip_rate = std::env::var("RELAY_PLAYER_CONNECTS_PER_SEC")
|
||||||
.ok()
|
.ok()
|
||||||
.and_then(|v| v.parse().ok())
|
.and_then(|v| v.parse().ok())
|
||||||
@@ -216,6 +221,29 @@ impl RelayGuards {
|
|||||||
.and_then(|v| v.parse().ok())
|
.and_then(|v| v.parse().ok())
|
||||||
.unwrap_or(512.0)
|
.unwrap_or(512.0)
|
||||||
* 1024.0;
|
* 1024.0;
|
||||||
|
let redis = match std::env::var("REDIS_URL") {
|
||||||
|
Ok(url) => match redis::Client::open(url) {
|
||||||
|
Ok(client) => redis::aio::ConnectionManager::new(client).await.ok(),
|
||||||
|
Err(_) => None,
|
||||||
|
},
|
||||||
|
Err(_) => None,
|
||||||
|
};
|
||||||
|
let player_global_window_secs = std::env::var("RELAY_PLAYER_GLOBAL_WINDOW_SECS")
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| v.parse().ok())
|
||||||
|
.unwrap_or(10);
|
||||||
|
let player_global_limit = std::env::var("RELAY_PLAYER_GLOBAL_LIMIT")
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| v.parse().ok())
|
||||||
|
.unwrap_or(50);
|
||||||
|
let reg_global_window_secs = std::env::var("RELAY_REG_GLOBAL_WINDOW_SECS")
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| v.parse().ok())
|
||||||
|
.unwrap_or(60);
|
||||||
|
let reg_global_limit = std::env::var("RELAY_REG_GLOBAL_LIMIT")
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| v.parse().ok())
|
||||||
|
.unwrap_or(10);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
player_ip: Arc::new(Mutex::new(HashMap::new())),
|
player_ip: Arc::new(Mutex::new(HashMap::new())),
|
||||||
@@ -228,6 +256,11 @@ impl RelayGuards {
|
|||||||
reg_ip_burst,
|
reg_ip_burst,
|
||||||
session_bw_rate_bytes,
|
session_bw_rate_bytes,
|
||||||
session_bw_burst_bytes,
|
session_bw_burst_bytes,
|
||||||
|
redis,
|
||||||
|
player_global_window_secs,
|
||||||
|
player_global_limit,
|
||||||
|
reg_global_window_secs,
|
||||||
|
reg_global_limit,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -249,7 +282,18 @@ impl RelayGuards {
|
|||||||
let bucket = guard
|
let bucket = guard
|
||||||
.entry(ip.to_string())
|
.entry(ip.to_string())
|
||||||
.or_insert_with(|| BucketState::new(burst, rate));
|
.or_insert_with(|| BucketState::new(burst, rate));
|
||||||
bucket.reserve_delay(1).is_zero()
|
let local_ok = bucket.reserve_delay(1).is_zero();
|
||||||
|
drop(guard);
|
||||||
|
if !local_ok {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (window_secs, limit, scope) = if player {
|
||||||
|
(self.player_global_window_secs, self.player_global_limit, "mc")
|
||||||
|
} else {
|
||||||
|
(self.reg_global_window_secs, self.reg_global_limit, "reg")
|
||||||
|
};
|
||||||
|
self.redis_allow_ip_window(ip, scope, window_secs, limit).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn throttle_session_bytes(&self, session_id: &str, dir: SessionDir, bytes: usize) {
|
async fn throttle_session_bytes(&self, session_id: &str, dir: SessionDir, bytes: usize) {
|
||||||
@@ -273,6 +317,31 @@ impl RelayGuards {
|
|||||||
self.session_ingress.lock().await.remove(session_id);
|
self.session_ingress.lock().await.remove(session_id);
|
||||||
self.session_egress.lock().await.remove(session_id);
|
self.session_egress.lock().await.remove(session_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn redis_allow_ip_window(
|
||||||
|
&self,
|
||||||
|
ip: &str,
|
||||||
|
scope: &str,
|
||||||
|
window_secs: u64,
|
||||||
|
limit: i64,
|
||||||
|
) -> bool {
|
||||||
|
let Some(mut conn) = self.redis.clone() else {
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
let key = format!("ratelimit:ip:{ip}:{scope}");
|
||||||
|
let res: redis::RedisResult<i64> = async {
|
||||||
|
let count: i64 = conn.incr(&key, 1).await?;
|
||||||
|
if count == 1 {
|
||||||
|
let _: bool = conn.expire(&key, window_secs as i64).await?;
|
||||||
|
}
|
||||||
|
Ok(count)
|
||||||
|
}
|
||||||
|
.await;
|
||||||
|
match res {
|
||||||
|
Ok(count) => count <= limit,
|
||||||
|
Err(_) => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
@@ -462,7 +531,7 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
let cfg = RelayConfig::from_env();
|
let cfg = RelayConfig::from_env();
|
||||||
let registry = RedisRegistry::from_env(&cfg).await;
|
let registry = RedisRegistry::from_env(&cfg).await;
|
||||||
let guards = Arc::new(RelayGuards::from_env());
|
let guards = Arc::new(RelayGuards::from_env().await);
|
||||||
let r2r = Arc::new(R2rManager::new());
|
let r2r = Arc::new(R2rManager::new());
|
||||||
registry.register_instance().await;
|
registry.register_instance().await;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user