Polish metrics and validate config
All checks were successful
ci / build (push) Successful in 2m27s

This commit is contained in:
2026-02-24 10:12:54 +00:00
parent f0c5261e7b
commit d2db097445

View File

@@ -65,7 +65,7 @@ struct RelayConfig {
}
impl RelayConfig {
fn from_env() -> Self {
fn from_env() -> Result<Self> {
let control_bind = std::env::var("RELAY_CONTROL_BIND")
.unwrap_or_else(|_| "0.0.0.0:7000".to_string());
let player_bind =
@@ -84,7 +84,7 @@ impl RelayConfig {
let r2r_quic_cert = std::env::var("RELAY_R2R_QUIC_CERT").ok();
let r2r_quic_key = std::env::var("RELAY_R2R_QUIC_KEY").ok();
Self {
let cfg = Self {
instance_id: std::env::var("RELAY_INSTANCE_ID")
.unwrap_or_else(|_| format!("relay-{}", Uuid::new_v4())),
region: std::env::var("RELAY_REGION").unwrap_or_else(|_| "eu".to_string()),
@@ -122,8 +122,8 @@ impl RelayConfig {
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(128),
control_queue_policy: QueuePolicy::from_env("RELAY_CONTROL_QUEUE_POLICY"),
stream_queue_policy: QueuePolicy::from_env("RELAY_STREAM_QUEUE_POLICY"),
control_queue_policy: QueuePolicy::from_env("RELAY_CONTROL_QUEUE_POLICY")?,
stream_queue_policy: QueuePolicy::from_env("RELAY_STREAM_QUEUE_POLICY")?,
max_streams_per_session: std::env::var("RELAY_MAX_STREAMS_PER_SESSION")
.ok()
.and_then(|v| v.parse().ok())
@@ -136,7 +136,36 @@ impl RelayConfig {
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(30),
};
cfg.validate()?;
Ok(cfg)
}
fn validate(&self) -> Result<()> {
if self.control_queue_depth == 0 {
anyhow::bail!("RELAY_CONTROL_QUEUE_DEPTH must be > 0");
}
if self.stream_queue_depth == 0 {
anyhow::bail!("RELAY_STREAM_QUEUE_DEPTH must be > 0");
}
if self.limit_ttl_secs == 0 {
anyhow::bail!("RELAY_LIMIT_TTL_SECS must be > 0");
}
match self.r2r_transport.as_str() {
"tcp" | "quic" => {}
other => anyhow::bail!("invalid RELAY_R2R_TRANSPORT: {other} (expected tcp|quic)"),
}
if self.r2r_transport == "quic" {
if (self.r2r_quic_cert.is_some() && self.r2r_quic_key.is_none())
|| (self.r2r_quic_cert.is_none() && self.r2r_quic_key.is_some())
{
anyhow::bail!("RELAY_R2R_QUIC_CERT and RELAY_R2R_QUIC_KEY must be set together");
}
if !self.r2r_quic_insecure && self.r2r_quic_cert.is_none() {
anyhow::bail!("QUIC requires RELAY_R2R_QUIC_CERT/KEY or RELAY_R2R_QUIC_INSECURE=1");
}
}
Ok(())
}
}
@@ -205,14 +234,16 @@ enum QueuePolicy {
}
impl QueuePolicy {
fn from_env(var: &str) -> Self {
fn from_env(var: &str) -> Result<Self> {
match std::env::var(var)
.ok()
.map(|v| v.to_ascii_lowercase())
.as_deref()
{
Some("deny") => QueuePolicy::Deny,
_ => QueuePolicy::Drop,
None => Ok(QueuePolicy::Drop),
Some("deny") => Ok(QueuePolicy::Deny),
Some("drop") => Ok(QueuePolicy::Drop),
Some(other) => anyhow::bail!("{var} must be drop|deny (got {other})"),
}
}
}
@@ -805,7 +836,7 @@ async fn main() -> Result<()> {
)
.init();
let cfg = RelayConfig::from_env();
let cfg = RelayConfig::from_env()?;
let registry = RedisRegistry::from_env(&cfg).await;
let limits = Arc::new(registry.limits(cfg.limit_ttl_secs));
let guards = Arc::new(RelayGuards::from_env());
@@ -832,7 +863,22 @@ async fn main() -> Result<()> {
None
};
info!(instance_id = %cfg.instance_id, region = %cfg.region, control = %cfg.control_bind, player = %cfg.player_bind, r2r = %cfg.r2r_bind, r2r_advertise = %cfg.r2r_advertise_addr, r2r_transport = %cfg.r2r_transport, "relay started");
info!(
instance_id = %cfg.instance_id,
region = %cfg.region,
control = %cfg.control_bind,
player = %cfg.player_bind,
r2r = %cfg.r2r_bind,
r2r_advertise = %cfg.r2r_advertise_addr,
r2r_transport = %cfg.r2r_transport,
control_queue_depth = cfg.control_queue_depth,
stream_queue_depth = cfg.stream_queue_depth,
control_queue_policy = ?cfg.control_queue_policy,
stream_queue_policy = ?cfg.stream_queue_policy,
max_streams_per_session = cfg.max_streams_per_session,
max_streams_per_user = cfg.max_streams_per_user,
"relay started"
);
metrics::gauge!("relay_drain_state").set(0.0);
let shutdown = Arc::new(Notify::new());
@@ -1027,6 +1073,7 @@ async fn run_quic_r2r_accept_loop(
tokio::select! {
_ = shutdown.notified() => break,
Some(connecting) = quic.server.accept() => {
metrics::counter!("relay_r2r_quic_accepts_total").increment(1);
let cfg = cfg.clone();
let state = state.clone();
let guards = guards.clone();
@@ -1184,7 +1231,15 @@ async fn control_read_loop(
let sink = lookup_stream_sink(state, session_id, &stream_id).await;
if let Some(tx) = sink {
if !send_stream_chunk(&tx, data, stream_queue_policy).await {
metrics::counter!("relay_queue_drops_total", "queue" => "stream_ingress").increment(1);
metrics::counter!(
"relay_queue_drops_total",
"queue" => "stream_ingress",
"policy" => match stream_queue_policy {
QueuePolicy::Drop => "drop",
QueuePolicy::Deny => "deny",
}
)
.increment(1);
}
if tx.is_closed() {
remove_stream_sink(state, session_id, &stream_id).await;
@@ -1525,6 +1580,7 @@ where
debug!(session_id = %session.session_id, user_id = %session.user_id, "stream limit denied");
return Ok(());
}
metrics::counter!("relay_stream_limit_allowed_total").increment(1);
let stream_id = stream_id_override.unwrap_or_else(|| Uuid::new_v4().to_string());
let user_id = session.user_id.clone();
@@ -1532,6 +1588,7 @@ where
let priority = if source == "r2r" { StreamPriority::High } else { StreamPriority::Normal };
session.stream_sinks.write().await.insert(stream_id.clone(), to_player_tx);
session.stream_priorities.write().await.insert(stream_id.clone(), priority);
metrics::gauge!("relay_active_streams").increment(1.0);
let accepted = session
.tx
@@ -1544,7 +1601,12 @@ where
}))
.await;
if !accepted {
metrics::counter!("relay_queue_drops_total", "queue" => "control_high").increment(1);
metrics::counter!(
"relay_queue_drops_total",
"queue" => "control_high",
"priority" => "high"
)
.increment(1);
let _ = remove_stream_entry_by_store(session.stream_sinks.clone(), session.stream_priorities.clone(), &stream_id).await;
limits.release_stream(&session.session_id, &session.user_id).await;
return Ok(());
@@ -1648,7 +1710,15 @@ where
StreamPriority::Normal => tx_control.send_low(frame).await,
};
if !sent {
metrics::counter!("relay_queue_drops_total", "queue" => "control_data").increment(1);
metrics::counter!(
"relay_queue_drops_total",
"queue" => "control_data",
"priority" => match priority {
StreamPriority::High => "high",
StreamPriority::Normal => "normal",
}
)
.increment(1);
}
metrics::counter!("relay_bytes_out_total").increment(n as u64);
}
@@ -1697,7 +1767,11 @@ async fn remove_stream_entry_by_store(
stream_id: &str,
) -> Option<mpsc::Sender<Vec<u8>>> {
priorities.write().await.remove(stream_id);
store.write().await.remove(stream_id)
let removed = store.write().await.remove(stream_id);
if removed.is_some() {
metrics::gauge!("relay_active_streams").decrement(1.0);
}
removed
}
fn token_looks_valid(token: &str) -> bool {