From d2db097445da11c3ed5820aefc8405c66743ad19 Mon Sep 17 00:00:00 2001 From: Lawrence Date: Tue, 24 Feb 2026 10:12:54 +0000 Subject: [PATCH] Polish metrics and validate config --- relay/src/main.rs | 100 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 87 insertions(+), 13 deletions(-) diff --git a/relay/src/main.rs b/relay/src/main.rs index 951db17..bd60020 100644 --- a/relay/src/main.rs +++ b/relay/src/main.rs @@ -65,7 +65,7 @@ struct RelayConfig { } impl RelayConfig { - fn from_env() -> Self { + fn from_env() -> Result { let control_bind = std::env::var("RELAY_CONTROL_BIND") .unwrap_or_else(|_| "0.0.0.0:7000".to_string()); let player_bind = @@ -84,7 +84,7 @@ impl RelayConfig { let r2r_quic_cert = std::env::var("RELAY_R2R_QUIC_CERT").ok(); let r2r_quic_key = std::env::var("RELAY_R2R_QUIC_KEY").ok(); - Self { + let cfg = Self { instance_id: std::env::var("RELAY_INSTANCE_ID") .unwrap_or_else(|_| format!("relay-{}", Uuid::new_v4())), region: std::env::var("RELAY_REGION").unwrap_or_else(|_| "eu".to_string()), @@ -122,8 +122,8 @@ impl RelayConfig { .ok() .and_then(|v| v.parse().ok()) .unwrap_or(128), - control_queue_policy: QueuePolicy::from_env("RELAY_CONTROL_QUEUE_POLICY"), - stream_queue_policy: QueuePolicy::from_env("RELAY_STREAM_QUEUE_POLICY"), + control_queue_policy: QueuePolicy::from_env("RELAY_CONTROL_QUEUE_POLICY")?, + stream_queue_policy: QueuePolicy::from_env("RELAY_STREAM_QUEUE_POLICY")?, max_streams_per_session: std::env::var("RELAY_MAX_STREAMS_PER_SESSION") .ok() .and_then(|v| v.parse().ok()) @@ -136,7 +136,36 @@ impl RelayConfig { .ok() .and_then(|v| v.parse().ok()) .unwrap_or(30), + }; + cfg.validate()?; + Ok(cfg) + } + + fn validate(&self) -> Result<()> { + if self.control_queue_depth == 0 { + anyhow::bail!("RELAY_CONTROL_QUEUE_DEPTH must be > 0"); } + if self.stream_queue_depth == 0 { + anyhow::bail!("RELAY_STREAM_QUEUE_DEPTH must be > 0"); + } + if self.limit_ttl_secs == 0 { + anyhow::bail!("RELAY_LIMIT_TTL_SECS must be > 0"); + } + match self.r2r_transport.as_str() { + "tcp" | "quic" => {} + other => anyhow::bail!("invalid RELAY_R2R_TRANSPORT: {other} (expected tcp|quic)"), + } + if self.r2r_transport == "quic" { + if (self.r2r_quic_cert.is_some() && self.r2r_quic_key.is_none()) + || (self.r2r_quic_cert.is_none() && self.r2r_quic_key.is_some()) + { + anyhow::bail!("RELAY_R2R_QUIC_CERT and RELAY_R2R_QUIC_KEY must be set together"); + } + if !self.r2r_quic_insecure && self.r2r_quic_cert.is_none() { + anyhow::bail!("QUIC requires RELAY_R2R_QUIC_CERT/KEY or RELAY_R2R_QUIC_INSECURE=1"); + } + } + Ok(()) } } @@ -205,14 +234,16 @@ enum QueuePolicy { } impl QueuePolicy { - fn from_env(var: &str) -> Self { + fn from_env(var: &str) -> Result { match std::env::var(var) .ok() .map(|v| v.to_ascii_lowercase()) .as_deref() { - Some("deny") => QueuePolicy::Deny, - _ => QueuePolicy::Drop, + None => Ok(QueuePolicy::Drop), + Some("deny") => Ok(QueuePolicy::Deny), + Some("drop") => Ok(QueuePolicy::Drop), + Some(other) => anyhow::bail!("{var} must be drop|deny (got {other})"), } } } @@ -805,7 +836,7 @@ async fn main() -> Result<()> { ) .init(); - let cfg = RelayConfig::from_env(); + let cfg = RelayConfig::from_env()?; let registry = RedisRegistry::from_env(&cfg).await; let limits = Arc::new(registry.limits(cfg.limit_ttl_secs)); let guards = Arc::new(RelayGuards::from_env()); @@ -832,7 +863,22 @@ async fn main() -> Result<()> { None }; - info!(instance_id = %cfg.instance_id, region = %cfg.region, control = %cfg.control_bind, player = %cfg.player_bind, r2r = %cfg.r2r_bind, r2r_advertise = %cfg.r2r_advertise_addr, r2r_transport = %cfg.r2r_transport, "relay started"); + info!( + instance_id = %cfg.instance_id, + region = %cfg.region, + control = %cfg.control_bind, + player = %cfg.player_bind, + r2r = %cfg.r2r_bind, + r2r_advertise = %cfg.r2r_advertise_addr, + r2r_transport = %cfg.r2r_transport, + control_queue_depth = cfg.control_queue_depth, + stream_queue_depth = cfg.stream_queue_depth, + control_queue_policy = ?cfg.control_queue_policy, + stream_queue_policy = ?cfg.stream_queue_policy, + max_streams_per_session = cfg.max_streams_per_session, + max_streams_per_user = cfg.max_streams_per_user, + "relay started" + ); metrics::gauge!("relay_drain_state").set(0.0); let shutdown = Arc::new(Notify::new()); @@ -1027,6 +1073,7 @@ async fn run_quic_r2r_accept_loop( tokio::select! { _ = shutdown.notified() => break, Some(connecting) = quic.server.accept() => { + metrics::counter!("relay_r2r_quic_accepts_total").increment(1); let cfg = cfg.clone(); let state = state.clone(); let guards = guards.clone(); @@ -1184,7 +1231,15 @@ async fn control_read_loop( let sink = lookup_stream_sink(state, session_id, &stream_id).await; if let Some(tx) = sink { if !send_stream_chunk(&tx, data, stream_queue_policy).await { - metrics::counter!("relay_queue_drops_total", "queue" => "stream_ingress").increment(1); + metrics::counter!( + "relay_queue_drops_total", + "queue" => "stream_ingress", + "policy" => match stream_queue_policy { + QueuePolicy::Drop => "drop", + QueuePolicy::Deny => "deny", + } + ) + .increment(1); } if tx.is_closed() { remove_stream_sink(state, session_id, &stream_id).await; @@ -1525,6 +1580,7 @@ where debug!(session_id = %session.session_id, user_id = %session.user_id, "stream limit denied"); return Ok(()); } + metrics::counter!("relay_stream_limit_allowed_total").increment(1); let stream_id = stream_id_override.unwrap_or_else(|| Uuid::new_v4().to_string()); let user_id = session.user_id.clone(); @@ -1532,6 +1588,7 @@ where let priority = if source == "r2r" { StreamPriority::High } else { StreamPriority::Normal }; session.stream_sinks.write().await.insert(stream_id.clone(), to_player_tx); session.stream_priorities.write().await.insert(stream_id.clone(), priority); + metrics::gauge!("relay_active_streams").increment(1.0); let accepted = session .tx @@ -1544,7 +1601,12 @@ where })) .await; if !accepted { - metrics::counter!("relay_queue_drops_total", "queue" => "control_high").increment(1); + metrics::counter!( + "relay_queue_drops_total", + "queue" => "control_high", + "priority" => "high" + ) + .increment(1); let _ = remove_stream_entry_by_store(session.stream_sinks.clone(), session.stream_priorities.clone(), &stream_id).await; limits.release_stream(&session.session_id, &session.user_id).await; return Ok(()); @@ -1648,7 +1710,15 @@ where StreamPriority::Normal => tx_control.send_low(frame).await, }; if !sent { - metrics::counter!("relay_queue_drops_total", "queue" => "control_data").increment(1); + metrics::counter!( + "relay_queue_drops_total", + "queue" => "control_data", + "priority" => match priority { + StreamPriority::High => "high", + StreamPriority::Normal => "normal", + } + ) + .increment(1); } metrics::counter!("relay_bytes_out_total").increment(n as u64); } @@ -1697,7 +1767,11 @@ async fn remove_stream_entry_by_store( stream_id: &str, ) -> Option>> { priorities.write().await.remove(stream_id); - store.write().await.remove(stream_id) + let removed = store.write().await.remove(stream_id); + if removed.is_some() { + metrics::gauge!("relay_active_streams").decrement(1.0); + } + removed } fn token_looks_valid(token: &str) -> bool {