feat: add prometheus metrics and tracing instrumentation

This commit is contained in:
L
2026-02-23 23:30:07 +00:00
parent fe8376dd6d
commit 4ce94a5b17
6 changed files with 661 additions and 12 deletions

View File

@@ -14,4 +14,6 @@ redis.workspace = true
serde_json.workspace = true
chrono.workspace = true
serde.workspace = true
metrics.workspace = true
metrics-exporter-prometheus.workspace = true
common = { path = "../common" }

View File

@@ -16,6 +16,7 @@ use common::{
};
use redis::AsyncCommands;
use serde::Deserialize;
use metrics_exporter_prometheus::PrometheusBuilder;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, copy_bidirectional},
net::{TcpListener, TcpStream},
@@ -436,6 +437,7 @@ impl RedisRegistry {
#[tokio::main]
async fn main() -> Result<()> {
init_metrics()?;
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
@@ -459,6 +461,7 @@ async fn main() -> Result<()> {
.with_context(|| format!("bind r2r {}", cfg.r2r_bind))?;
info!(instance_id = %cfg.instance_id, region = %cfg.region, control = %cfg.control_bind, player = %cfg.player_bind, r2r = %cfg.r2r_bind, r2r_advertise = %cfg.r2r_advertise_addr, "relay started");
metrics::gauge!("relay_drain_state").set(0.0);
let shutdown = Arc::new(Notify::new());
let state: SharedState = Arc::new(RwLock::new(RelayState::new()));
@@ -502,6 +505,7 @@ async fn main() -> Result<()> {
}
registry.set_draining().await;
metrics::gauge!("relay_drain_state").set(1.0);
shutdown.notify_waiters();
info!("draining relay");
tokio::time::sleep(Duration::from_secs(1)).await;
@@ -516,6 +520,7 @@ async fn run_registry_heartbeat(state: SharedState, registry: RedisRegistry, shu
_ = shutdown.notified() => break,
_ = ticker.tick() => {
let count = state.read().await.session_count();
metrics::gauge!("relay_active_tunnels").set(count as f64);
registry.heartbeat_instance(count).await;
}
}
@@ -538,6 +543,7 @@ async fn run_control_accept_loop(
Ok(v) => v,
Err(e) => { warn!(error = %e, "control accept failed"); continue; }
};
metrics::counter!("relay_control_accepts_total").increment(1);
let cfg = cfg.clone();
let state = state.clone();
let registry = registry.clone();
@@ -569,6 +575,7 @@ async fn run_player_accept_loop(
Ok(v) => v,
Err(e) => { warn!(error = %e, "player accept failed"); continue; }
};
metrics::counter!("relay_player_accepts_total").increment(1);
let cfg = cfg.clone();
let state = state.clone();
let registry = registry.clone();
@@ -599,6 +606,7 @@ async fn run_r2r_accept_loop(
Ok(v) => v,
Err(e) => { warn!(error = %e, "r2r accept failed"); continue; }
};
metrics::counter!("relay_r2r_accepts_total").increment(1);
let cfg = cfg.clone();
let state = state.clone();
let guards = guards.clone();
@@ -613,6 +621,7 @@ async fn run_r2r_accept_loop(
Ok(())
}
#[tracing::instrument(skip(stream, state, registry, guards, cfg), fields(peer = %addr))]
async fn handle_control_conn(
stream: TcpStream,
addr: SocketAddr,
@@ -622,6 +631,7 @@ async fn handle_control_conn(
guards: Arc<RelayGuards>,
) -> Result<()> {
if !guards.allow_registration_ip(&addr.ip().to_string()).await {
metrics::counter!("relay_rate_limited_total", "scope" => "registration_ip").increment(1);
anyhow::bail!("registration rate limited for {}", addr.ip());
}
@@ -667,6 +677,7 @@ async fn handle_control_conn(
owner_instance_id: cfg.instance_id.clone(),
})).await?;
info!(peer = %addr, user_id = %user_id, fqdn = %fqdn, session_id = %session_id, "client registered");
metrics::counter!("relay_tunnel_registrations_total").increment(1);
let write_task = tokio::spawn(async move {
while let Some(frame) = rx.recv().await {
@@ -749,6 +760,7 @@ async fn control_read_loop(
}
}
#[tracing::instrument(skip(stream, cfg, state, registry, guards), fields(peer = %addr))]
async fn handle_player_conn(
mut stream: TcpStream,
addr: SocketAddr,
@@ -758,6 +770,7 @@ async fn handle_player_conn(
guards: Arc<RelayGuards>,
) -> Result<()> {
if !guards.allow_player_ip(&addr.ip().to_string()).await {
metrics::counter!("relay_rate_limited_total", "scope" => "player_ip").increment(1);
debug!(peer = %addr, "player connect rate limited");
return Ok(());
}
@@ -792,6 +805,7 @@ async fn handle_player_conn(
Ok(())
}
#[tracing::instrument(skip(stream, _cfg, state, guards), fields(peer = %addr))]
async fn handle_r2r_conn(
mut stream: TcpStream,
addr: SocketAddr,
@@ -826,6 +840,7 @@ async fn handle_r2r_conn(
.with_context(|| format!("r2r attach failed from {addr}"))
}
#[tracing::instrument(skip(player_stream, route, cfg, registry), fields(peer = %player_addr, hostname = %hostname))]
async fn proxy_player_to_owner(
mut player_stream: TcpStream,
player_addr: SocketAddr,
@@ -835,18 +850,24 @@ async fn proxy_player_to_owner(
cfg: RelayConfig,
registry: RedisRegistry,
) -> Result<()> {
let redis_lookup_started = Instant::now();
let owner = registry
.lookup_instance(&route.instance_id)
.await
.with_context(|| format!("owner instance {} not found in redis", route.instance_id))?;
metrics::histogram!("relay_redis_lookup_latency_ms")
.record(redis_lookup_started.elapsed().as_secs_f64() * 1000.0);
let r2r_addr = owner
.r2r_addr
.clone()
.with_context(|| format!("owner {} missing r2r_addr", route.instance_id))?;
let r2r_connect_started = Instant::now();
let mut owner_stream = timeout(cfg.r2r_connect_timeout, TcpStream::connect(&r2r_addr))
.await
.context("r2r connect timeout")??;
metrics::histogram!("relay_r2r_connect_latency_ms")
.record(r2r_connect_started.elapsed().as_secs_f64() * 1000.0);
let prelude = RelayForwardPrelude {
version: 1,
@@ -861,6 +882,7 @@ async fn proxy_player_to_owner(
write_frame(&mut owner_stream, &prelude).await?;
let _ = copy_bidirectional(&mut player_stream, &mut owner_stream).await?;
metrics::counter!("relay_r2r_forwards_total").increment(1);
info!(peer = %player_addr, hostname = %hostname, owner = %route.instance_id, "proxied player connection to owner relay");
Ok(())
}
@@ -945,6 +967,7 @@ async fn attach_player_socket_to_session(
});
info!(peer = %peer_addr, hostname = %hostname, session_id = %session.session_id, stream_id = %stream_id, source, "player proxied via client stream");
metrics::gauge!("relay_active_player_conns").increment(1.0);
Ok(())
}
@@ -971,7 +994,9 @@ async fn run_player_reader(
}))
.await
.context("send stream data to client")?;
metrics::counter!("relay_bytes_out_total").increment(n as u64);
}
metrics::gauge!("relay_active_player_conns").decrement(1.0);
Ok(())
}
@@ -981,6 +1006,7 @@ async fn run_player_writer(
) -> Result<()> {
while let Some(chunk) = rx.recv().await {
writer.write_all(&chunk).await?;
metrics::counter!("relay_bytes_in_total").increment(chunk.len() as u64);
}
let _ = writer.shutdown().await;
Ok(())
@@ -1062,3 +1088,14 @@ fn guess_advertise_addr(bind: &str) -> String {
"127.0.0.1:7001".to_string()
}
}
fn init_metrics() -> Result<()> {
if let Ok(bind) = std::env::var("RELAY_METRICS_BIND") {
let addr: std::net::SocketAddr = bind.parse().context("parse RELAY_METRICS_BIND")?;
PrometheusBuilder::new()
.with_http_listener(addr)
.install()
.context("install prometheus exporter")?;
}
Ok(())
}