From e7ef7fdf70ce1216135366ccd3a0efccf6c3c65b Mon Sep 17 00:00:00 2001 From: L Date: Mon, 23 Feb 2026 23:23:22 +0000 Subject: [PATCH] feat: add replica-aware relay-to-relay forwarding --- common/src/protocol.rs | 12 ++ relay/Cargo.toml | 1 + relay/src/main.rs | 289 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 270 insertions(+), 32 deletions(-) diff --git a/common/src/protocol.rs b/common/src/protocol.rs index c5e1700..b489abe 100644 --- a/common/src/protocol.rs +++ b/common/src/protocol.rs @@ -45,6 +45,18 @@ pub struct StreamClosed { pub reason: Option, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RelayForwardPrelude { + pub version: u8, + pub session_id: String, + pub fqdn: String, + pub stream_id: String, + pub peer_addr: String, + pub origin_instance_id: String, + pub hop_count: u8, + pub initial_data: Vec, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", content = "data")] pub enum ClientFrame { diff --git a/relay/Cargo.toml b/relay/Cargo.toml index ee1ed0a..6aa53f1 100644 --- a/relay/Cargo.toml +++ b/relay/Cargo.toml @@ -13,4 +13,5 @@ fastrand.workspace = true redis.workspace = true serde_json.workspace = true chrono.workspace = true +serde.workspace = true common = { path = "../common" } diff --git a/relay/src/main.rs b/relay/src/main.rs index 2d8e34e..0476173 100644 --- a/relay/src/main.rs +++ b/relay/src/main.rs @@ -10,13 +10,14 @@ use common::{ codec::{read_frame, write_frame}, minecraft::read_handshake_hostname_and_bytes, protocol::{ - ClientFrame, Heartbeat, IncomingTcp, RegisterAccepted, RegisterRequest, ServerFrame, - StreamClosed, StreamData, + ClientFrame, Heartbeat, IncomingTcp, RegisterAccepted, RegisterRequest, RelayForwardPrelude, + ServerFrame, StreamClosed, StreamData, }, }; use redis::AsyncCommands; +use serde::Deserialize; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, + io::{AsyncReadExt, AsyncWriteExt, copy_bidirectional}, net::{TcpListener, TcpStream}, sync::{Notify, RwLock, mpsc}, time::{MissedTickBehavior, interval, timeout}, @@ -30,21 +31,31 @@ struct RelayConfig { region: String, control_bind: String, player_bind: String, + r2r_bind: String, + r2r_advertise_addr: String, domain: String, heartbeat_timeout: Duration, registry_ttl_secs: u64, + r2r_connect_timeout: Duration, } impl RelayConfig { fn from_env() -> Self { + let control_bind = std::env::var("RELAY_CONTROL_BIND") + .unwrap_or_else(|_| "0.0.0.0:7000".to_string()); + let player_bind = + std::env::var("RELAY_PLAYER_BIND").unwrap_or_else(|_| "0.0.0.0:25565".to_string()); + let r2r_bind = std::env::var("RELAY_R2R_BIND").unwrap_or_else(|_| "0.0.0.0:7001".to_string()); + let r2r_advertise_addr = std::env::var("RELAY_R2R_ADVERTISE_ADDR") + .unwrap_or_else(|_| guess_advertise_addr(&r2r_bind)); Self { instance_id: std::env::var("RELAY_INSTANCE_ID") .unwrap_or_else(|_| format!("relay-{}", Uuid::new_v4())), region: std::env::var("RELAY_REGION").unwrap_or_else(|_| "eu".to_string()), - control_bind: std::env::var("RELAY_CONTROL_BIND") - .unwrap_or_else(|_| "0.0.0.0:7000".to_string()), - player_bind: std::env::var("RELAY_PLAYER_BIND") - .unwrap_or_else(|_| "0.0.0.0:25565".to_string()), + control_bind, + player_bind, + r2r_bind, + r2r_advertise_addr, domain: std::env::var("RELAY_BASE_DOMAIN").unwrap_or_else(|_| "dvv.one".to_string()), heartbeat_timeout: Duration::from_secs( std::env::var("RELAY_HEARTBEAT_TIMEOUT_SECS") @@ -56,6 +67,12 @@ impl RelayConfig { .ok() .and_then(|v| v.parse().ok()) .unwrap_or(20), + r2r_connect_timeout: Duration::from_secs( + std::env::var("RELAY_R2R_CONNECT_TIMEOUT_SECS") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(3), + ), } } } @@ -88,6 +105,26 @@ impl RelayState { type SharedState = Arc>; +#[derive(Debug, Clone, Deserialize)] +struct TunnelRouteRecord { + instance_id: String, + session_id: String, + #[serde(rename = "user_id")] + _user_id: Option, + #[serde(rename = "fqdn")] + _fqdn: Option, +} + +#[derive(Debug, Clone, Deserialize)] +struct RelayInstanceRecord { + #[serde(rename = "instance_id")] + _instance_id: String, + #[serde(rename = "region")] + _region: Option, + status: Option, + r2r_addr: Option, +} + #[derive(Clone)] struct RedisRegistry { conn: Option, @@ -95,6 +132,7 @@ struct RedisRegistry { region: String, control_addr: String, player_addr: String, + r2r_addr: String, ttl_secs: u64, } @@ -126,6 +164,7 @@ impl RedisRegistry { region: cfg.region.clone(), control_addr: cfg.control_bind.clone(), player_addr: cfg.player_bind.clone(), + r2r_addr: cfg.r2r_advertise_addr.clone(), ttl_secs: cfg.registry_ttl_secs, } } @@ -141,6 +180,7 @@ impl RedisRegistry { "status": "active", "control_addr": self.control_addr, "player_addr": self.player_addr, + "r2r_addr": self.r2r_addr, "started_at": chrono::Utc::now().timestamp(), }) .to_string(); @@ -172,6 +212,7 @@ impl RedisRegistry { "status": "active", "control_addr": self.control_addr, "player_addr": self.player_addr, + "r2r_addr": self.r2r_addr, "tunnel_count": tunnel_count, "updated_at": chrono::Utc::now().timestamp(), }) @@ -199,6 +240,7 @@ impl RedisRegistry { "status": "draining", "control_addr": self.control_addr, "player_addr": self.player_addr, + "r2r_addr": self.r2r_addr, "updated_at": chrono::Utc::now().timestamp(), }) .to_string(); @@ -248,6 +290,26 @@ impl RedisRegistry { } .await; } + + async fn lookup_tunnel(&self, fqdn: &str) -> Option { + let Some(mut conn) = self.conn.clone() else { + return None; + }; + let key = format!("tunnel:sub:{fqdn}"); + let raw: Option = conn.get(key).await.ok()?; + let raw = raw?; + serde_json::from_str(&raw).ok() + } + + async fn lookup_instance(&self, instance_id: &str) -> Option { + let Some(mut conn) = self.conn.clone() else { + return None; + }; + let key = format!("relay:instance:{instance_id}"); + let raw: Option = conn.get(key).await.ok()?; + let raw = raw?; + serde_json::from_str(&raw).ok() + } } #[tokio::main] @@ -269,17 +331,24 @@ async fn main() -> Result<()> { let player_listener = TcpListener::bind(&cfg.player_bind) .await .with_context(|| format!("bind player {}", cfg.player_bind))?; + let r2r_listener = TcpListener::bind(&cfg.r2r_bind) + .await + .with_context(|| format!("bind r2r {}", cfg.r2r_bind))?; - info!(instance_id = %cfg.instance_id, region = %cfg.region, "relay started"); + info!( + instance_id = %cfg.instance_id, + region = %cfg.region, + control = %cfg.control_bind, + player = %cfg.player_bind, + r2r = %cfg.r2r_bind, + r2r_advertise = %cfg.r2r_advertise_addr, + "relay started" + ); let shutdown = Arc::new(Notify::new()); let state: SharedState = Arc::new(RwLock::new(RelayState::new())); - let heartbeat_task = tokio::spawn(run_registry_heartbeat( - state.clone(), - registry.clone(), - shutdown.clone(), - )); + let heartbeat_task = tokio::spawn(run_registry_heartbeat(state.clone(), registry.clone(), shutdown.clone())); let control_task = tokio::spawn(run_control_accept_loop( control_listener, cfg.clone(), @@ -289,17 +358,28 @@ async fn main() -> Result<()> { )); let player_task = tokio::spawn(run_player_accept_loop( player_listener, + cfg.clone(), + state.clone(), + registry.clone(), + shutdown.clone(), + )); + let r2r_task = tokio::spawn(run_r2r_accept_loop( + r2r_listener, + cfg.clone(), state.clone(), shutdown.clone(), )); + tokio::pin!(heartbeat_task); tokio::pin!(control_task); tokio::pin!(player_task); + tokio::pin!(r2r_task); tokio::select! { _ = tokio::signal::ctrl_c() => info!("shutdown signal received"), res = &mut control_task => warn!("control accept loop ended: {:?}", res), res = &mut player_task => warn!("player accept loop ended: {:?}", res), + res = &mut r2r_task => warn!("r2r accept loop ended: {:?}", res), res = &mut heartbeat_task => warn!("registry heartbeat task ended: {:?}", res), } @@ -355,7 +435,9 @@ async fn run_control_accept_loop( async fn run_player_accept_loop( listener: TcpListener, + cfg: RelayConfig, state: SharedState, + registry: RedisRegistry, shutdown: Arc, ) -> Result<()> { loop { @@ -367,8 +449,10 @@ async fn run_player_accept_loop( Err(e) => { warn!(error = %e, "player accept failed"); continue; } }; let state = state.clone(); + let registry = registry.clone(); + let cfg = cfg.clone(); tokio::spawn(async move { - if let Err(e) = handle_player_conn(stream, addr, state).await { + if let Err(e) = handle_player_conn(stream, addr, cfg, state, registry).await { debug!(peer = %addr, error = %e, "player connection closed"); } }); @@ -378,6 +462,33 @@ async fn run_player_accept_loop( Ok(()) } +async fn run_r2r_accept_loop( + listener: TcpListener, + cfg: RelayConfig, + state: SharedState, + shutdown: Arc, +) -> Result<()> { + loop { + tokio::select! { + _ = shutdown.notified() => break, + res = listener.accept() => { + let (stream, addr) = match res { + Ok(v) => v, + Err(e) => { warn!(error = %e, "r2r accept failed"); continue; } + }; + let state = state.clone(); + let cfg = cfg.clone(); + tokio::spawn(async move { + if let Err(e) = handle_r2r_conn(stream, addr, cfg, state).await { + warn!(peer = %addr, error = %e, "r2r connection ended with error"); + } + }); + } + } + } + Ok(()) +} + async fn handle_control_conn( stream: TcpStream, addr: SocketAddr, @@ -518,22 +629,134 @@ async fn control_read_loop( async fn handle_player_conn( mut stream: TcpStream, addr: SocketAddr, + cfg: RelayConfig, state: SharedState, + registry: RedisRegistry, ) -> Result<()> { let (hostname, initial_data) = read_handshake_hostname_and_bytes(&mut stream) .await .context("parse minecraft handshake")?; - let session = { - let guard = state.read().await; - guard.by_fqdn.get(&hostname).cloned() - }; + if let Some(session) = local_session_for_hostname(&state, &hostname).await { + return attach_player_socket_to_session( + stream, + session, + hostname, + addr.to_string(), + initial_data, + None, + "direct", + ) + .await; + } + + if let Some(route) = registry.lookup_tunnel(&hostname).await { + if route.instance_id == cfg.instance_id { + debug!(peer = %addr, hostname = %hostname, session_id = %route.session_id, "route points to self but local session missing"); + return Ok(()); + } + return proxy_player_to_owner(stream, addr, hostname, initial_data, route, cfg, registry).await; + } + + debug!(peer = %addr, hostname = %hostname, "no tunnel for hostname"); + Ok(()) +} + +async fn handle_r2r_conn( + mut stream: TcpStream, + addr: SocketAddr, + _cfg: RelayConfig, + state: SharedState, +) -> Result<()> { + let prelude: RelayForwardPrelude = read_frame(&mut stream).await.context("read r2r prelude")?; + if prelude.version != 1 { + anyhow::bail!("unsupported r2r prelude version {}", prelude.version); + } + if prelude.hop_count > 1 { + anyhow::bail!("invalid hop_count {}", prelude.hop_count); + } + + let session = local_session_for_session_id(&state, &prelude.session_id).await; let Some(session) = session else { - debug!(peer = %addr, hostname = %hostname, "no tunnel for hostname"); - return Ok(()); + anyhow::bail!("owner session not found for {}", prelude.session_id); }; - let stream_id = Uuid::new_v4().to_string(); + attach_player_socket_to_session( + stream, + session, + prelude.fqdn.clone(), + prelude.peer_addr, + prelude.initial_data, + Some(prelude.stream_id), + "r2r", + ) + .await + .with_context(|| format!("r2r attach failed from {addr}")) +} + +async fn proxy_player_to_owner( + mut player_stream: TcpStream, + player_addr: SocketAddr, + hostname: String, + initial_data: Vec, + route: TunnelRouteRecord, + cfg: RelayConfig, + registry: RedisRegistry, +) -> Result<()> { + let owner = registry + .lookup_instance(&route.instance_id) + .await + .with_context(|| format!("owner instance {} not found in redis", route.instance_id))?; + if owner.status.as_deref() == Some("draining") { + debug!(owner = %route.instance_id, hostname = %hostname, "owner draining; attempting forward anyway"); + } + let r2r_addr = owner + .r2r_addr + .clone() + .with_context(|| format!("owner {} missing r2r_addr", route.instance_id))?; + + let mut owner_stream = timeout(cfg.r2r_connect_timeout, TcpStream::connect(&r2r_addr)) + .await + .context("r2r connect timeout")??; + + let prelude = RelayForwardPrelude { + version: 1, + session_id: route.session_id, + fqdn: hostname.clone(), + stream_id: Uuid::new_v4().to_string(), + peer_addr: player_addr.to_string(), + origin_instance_id: cfg.instance_id.clone(), + hop_count: 1, + initial_data, + }; + write_frame(&mut owner_stream, &prelude).await?; + + let _ = copy_bidirectional(&mut player_stream, &mut owner_stream).await?; + info!(peer = %player_addr, hostname = %hostname, owner = %route.instance_id, "proxied player connection to owner relay"); + Ok(()) +} + +async fn local_session_for_hostname(state: &SharedState, hostname: &str) -> Option { + let guard = state.read().await; + guard.by_fqdn.get(hostname).cloned() +} + +async fn local_session_for_session_id(state: &SharedState, session_id: &str) -> Option { + let guard = state.read().await; + let fqdn = guard.by_session.get(session_id)?.clone(); + guard.by_fqdn.get(&fqdn).cloned() +} + +async fn attach_player_socket_to_session( + stream: TcpStream, + session: SessionHandle, + hostname: String, + peer_addr: String, + initial_data: Vec, + stream_id_override: Option, + source: &'static str, +) -> Result<()> { + let stream_id = stream_id_override.unwrap_or_else(|| Uuid::new_v4().to_string()); let (player_read, player_write) = stream.into_split(); let (to_player_tx, to_player_rx) = mpsc::channel::>(128); session @@ -547,7 +770,7 @@ async fn handle_player_conn( .send(ServerFrame::IncomingTcp(IncomingTcp { stream_id: stream_id.clone(), session_id: session.session_id.clone(), - peer_addr: addr.to_string(), + peer_addr: peer_addr.clone(), hostname: hostname.clone(), initial_data, })) @@ -556,7 +779,6 @@ async fn handle_player_conn( let tx_control = session.tx.clone(); let stream_id_clone = stream_id.clone(); - let session_id_clone = session.session_id.clone(); let sinks = session.stream_sinks.clone(); tokio::spawn(async move { if let Err(e) = run_player_writer(player_write, to_player_rx).await { @@ -569,7 +791,6 @@ async fn handle_player_conn( })) .await; let _ = remove_stream_sink_by_store(sinks, &stream_id_clone).await; - let _ = session_id_clone; }); let tx_control = session.tx.clone(); @@ -588,7 +809,7 @@ async fn handle_player_conn( let _ = remove_stream_sink_by_store(sinks, &stream_id_clone).await; }); - info!(peer = %addr, hostname = %hostname, session_id = %session.session_id, stream_id = %stream_id, "player proxied via client stream"); + info!(peer = %peer_addr, hostname = %hostname, session_id = %session.session_id, stream_id = %stream_id, source, "player proxied via client stream"); Ok(()) } @@ -621,7 +842,7 @@ async fn run_player_writer( while let Some(chunk) = rx.recv().await { writer.write_all(&chunk).await?; } - writer.shutdown().await.ok(); + let _ = writer.shutdown().await; Ok(()) } @@ -641,12 +862,8 @@ async fn lookup_stream_sink( async fn remove_stream_sink(state: &SharedState, session_id: &str, stream_id: &str) { let store = { let guard = state.read().await; - let Some(fqdn) = guard.by_session.get(session_id).cloned() else { - return; - }; - let Some(handle) = guard.by_fqdn.get(&fqdn) else { - return; - }; + let Some(fqdn) = guard.by_session.get(session_id).cloned() else { return; }; + let Some(handle) = guard.by_fqdn.get(&fqdn) else { return; }; handle.stream_sinks.clone() }; let _ = remove_stream_sink_by_store(store, stream_id).await; @@ -697,3 +914,11 @@ fn sanitize_label(input: &str) -> String { .trim_matches('-') .to_ascii_lowercase() } + +fn guess_advertise_addr(bind: &str) -> String { + if let Some((_host, port)) = bind.rsplit_once(':') { + format!("127.0.0.1:{port}") + } else { + "127.0.0.1:7001".to_string() + } +}