feat: add replica-aware relay-to-relay forwarding
This commit is contained in:
@@ -45,6 +45,18 @@ pub struct StreamClosed {
|
|||||||
pub reason: Option<String>,
|
pub reason: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct RelayForwardPrelude {
|
||||||
|
pub version: u8,
|
||||||
|
pub session_id: String,
|
||||||
|
pub fqdn: String,
|
||||||
|
pub stream_id: String,
|
||||||
|
pub peer_addr: String,
|
||||||
|
pub origin_instance_id: String,
|
||||||
|
pub hop_count: u8,
|
||||||
|
pub initial_data: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
#[serde(tag = "type", content = "data")]
|
#[serde(tag = "type", content = "data")]
|
||||||
pub enum ClientFrame {
|
pub enum ClientFrame {
|
||||||
|
|||||||
@@ -13,4 +13,5 @@ fastrand.workspace = true
|
|||||||
redis.workspace = true
|
redis.workspace = true
|
||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
chrono.workspace = true
|
chrono.workspace = true
|
||||||
|
serde.workspace = true
|
||||||
common = { path = "../common" }
|
common = { path = "../common" }
|
||||||
|
|||||||
@@ -10,13 +10,14 @@ use common::{
|
|||||||
codec::{read_frame, write_frame},
|
codec::{read_frame, write_frame},
|
||||||
minecraft::read_handshake_hostname_and_bytes,
|
minecraft::read_handshake_hostname_and_bytes,
|
||||||
protocol::{
|
protocol::{
|
||||||
ClientFrame, Heartbeat, IncomingTcp, RegisterAccepted, RegisterRequest, ServerFrame,
|
ClientFrame, Heartbeat, IncomingTcp, RegisterAccepted, RegisterRequest, RelayForwardPrelude,
|
||||||
StreamClosed, StreamData,
|
ServerFrame, StreamClosed, StreamData,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use redis::AsyncCommands;
|
use redis::AsyncCommands;
|
||||||
|
use serde::Deserialize;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncReadExt, AsyncWriteExt},
|
io::{AsyncReadExt, AsyncWriteExt, copy_bidirectional},
|
||||||
net::{TcpListener, TcpStream},
|
net::{TcpListener, TcpStream},
|
||||||
sync::{Notify, RwLock, mpsc},
|
sync::{Notify, RwLock, mpsc},
|
||||||
time::{MissedTickBehavior, interval, timeout},
|
time::{MissedTickBehavior, interval, timeout},
|
||||||
@@ -30,21 +31,31 @@ struct RelayConfig {
|
|||||||
region: String,
|
region: String,
|
||||||
control_bind: String,
|
control_bind: String,
|
||||||
player_bind: String,
|
player_bind: String,
|
||||||
|
r2r_bind: String,
|
||||||
|
r2r_advertise_addr: String,
|
||||||
domain: String,
|
domain: String,
|
||||||
heartbeat_timeout: Duration,
|
heartbeat_timeout: Duration,
|
||||||
registry_ttl_secs: u64,
|
registry_ttl_secs: u64,
|
||||||
|
r2r_connect_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RelayConfig {
|
impl RelayConfig {
|
||||||
fn from_env() -> Self {
|
fn from_env() -> Self {
|
||||||
|
let control_bind = std::env::var("RELAY_CONTROL_BIND")
|
||||||
|
.unwrap_or_else(|_| "0.0.0.0:7000".to_string());
|
||||||
|
let player_bind =
|
||||||
|
std::env::var("RELAY_PLAYER_BIND").unwrap_or_else(|_| "0.0.0.0:25565".to_string());
|
||||||
|
let r2r_bind = std::env::var("RELAY_R2R_BIND").unwrap_or_else(|_| "0.0.0.0:7001".to_string());
|
||||||
|
let r2r_advertise_addr = std::env::var("RELAY_R2R_ADVERTISE_ADDR")
|
||||||
|
.unwrap_or_else(|_| guess_advertise_addr(&r2r_bind));
|
||||||
Self {
|
Self {
|
||||||
instance_id: std::env::var("RELAY_INSTANCE_ID")
|
instance_id: std::env::var("RELAY_INSTANCE_ID")
|
||||||
.unwrap_or_else(|_| format!("relay-{}", Uuid::new_v4())),
|
.unwrap_or_else(|_| format!("relay-{}", Uuid::new_v4())),
|
||||||
region: std::env::var("RELAY_REGION").unwrap_or_else(|_| "eu".to_string()),
|
region: std::env::var("RELAY_REGION").unwrap_or_else(|_| "eu".to_string()),
|
||||||
control_bind: std::env::var("RELAY_CONTROL_BIND")
|
control_bind,
|
||||||
.unwrap_or_else(|_| "0.0.0.0:7000".to_string()),
|
player_bind,
|
||||||
player_bind: std::env::var("RELAY_PLAYER_BIND")
|
r2r_bind,
|
||||||
.unwrap_or_else(|_| "0.0.0.0:25565".to_string()),
|
r2r_advertise_addr,
|
||||||
domain: std::env::var("RELAY_BASE_DOMAIN").unwrap_or_else(|_| "dvv.one".to_string()),
|
domain: std::env::var("RELAY_BASE_DOMAIN").unwrap_or_else(|_| "dvv.one".to_string()),
|
||||||
heartbeat_timeout: Duration::from_secs(
|
heartbeat_timeout: Duration::from_secs(
|
||||||
std::env::var("RELAY_HEARTBEAT_TIMEOUT_SECS")
|
std::env::var("RELAY_HEARTBEAT_TIMEOUT_SECS")
|
||||||
@@ -56,6 +67,12 @@ impl RelayConfig {
|
|||||||
.ok()
|
.ok()
|
||||||
.and_then(|v| v.parse().ok())
|
.and_then(|v| v.parse().ok())
|
||||||
.unwrap_or(20),
|
.unwrap_or(20),
|
||||||
|
r2r_connect_timeout: Duration::from_secs(
|
||||||
|
std::env::var("RELAY_R2R_CONNECT_TIMEOUT_SECS")
|
||||||
|
.ok()
|
||||||
|
.and_then(|v| v.parse().ok())
|
||||||
|
.unwrap_or(3),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -88,6 +105,26 @@ impl RelayState {
|
|||||||
|
|
||||||
type SharedState = Arc<RwLock<RelayState>>;
|
type SharedState = Arc<RwLock<RelayState>>;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
struct TunnelRouteRecord {
|
||||||
|
instance_id: String,
|
||||||
|
session_id: String,
|
||||||
|
#[serde(rename = "user_id")]
|
||||||
|
_user_id: Option<String>,
|
||||||
|
#[serde(rename = "fqdn")]
|
||||||
|
_fqdn: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
struct RelayInstanceRecord {
|
||||||
|
#[serde(rename = "instance_id")]
|
||||||
|
_instance_id: String,
|
||||||
|
#[serde(rename = "region")]
|
||||||
|
_region: Option<String>,
|
||||||
|
status: Option<String>,
|
||||||
|
r2r_addr: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct RedisRegistry {
|
struct RedisRegistry {
|
||||||
conn: Option<redis::aio::ConnectionManager>,
|
conn: Option<redis::aio::ConnectionManager>,
|
||||||
@@ -95,6 +132,7 @@ struct RedisRegistry {
|
|||||||
region: String,
|
region: String,
|
||||||
control_addr: String,
|
control_addr: String,
|
||||||
player_addr: String,
|
player_addr: String,
|
||||||
|
r2r_addr: String,
|
||||||
ttl_secs: u64,
|
ttl_secs: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -126,6 +164,7 @@ impl RedisRegistry {
|
|||||||
region: cfg.region.clone(),
|
region: cfg.region.clone(),
|
||||||
control_addr: cfg.control_bind.clone(),
|
control_addr: cfg.control_bind.clone(),
|
||||||
player_addr: cfg.player_bind.clone(),
|
player_addr: cfg.player_bind.clone(),
|
||||||
|
r2r_addr: cfg.r2r_advertise_addr.clone(),
|
||||||
ttl_secs: cfg.registry_ttl_secs,
|
ttl_secs: cfg.registry_ttl_secs,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -141,6 +180,7 @@ impl RedisRegistry {
|
|||||||
"status": "active",
|
"status": "active",
|
||||||
"control_addr": self.control_addr,
|
"control_addr": self.control_addr,
|
||||||
"player_addr": self.player_addr,
|
"player_addr": self.player_addr,
|
||||||
|
"r2r_addr": self.r2r_addr,
|
||||||
"started_at": chrono::Utc::now().timestamp(),
|
"started_at": chrono::Utc::now().timestamp(),
|
||||||
})
|
})
|
||||||
.to_string();
|
.to_string();
|
||||||
@@ -172,6 +212,7 @@ impl RedisRegistry {
|
|||||||
"status": "active",
|
"status": "active",
|
||||||
"control_addr": self.control_addr,
|
"control_addr": self.control_addr,
|
||||||
"player_addr": self.player_addr,
|
"player_addr": self.player_addr,
|
||||||
|
"r2r_addr": self.r2r_addr,
|
||||||
"tunnel_count": tunnel_count,
|
"tunnel_count": tunnel_count,
|
||||||
"updated_at": chrono::Utc::now().timestamp(),
|
"updated_at": chrono::Utc::now().timestamp(),
|
||||||
})
|
})
|
||||||
@@ -199,6 +240,7 @@ impl RedisRegistry {
|
|||||||
"status": "draining",
|
"status": "draining",
|
||||||
"control_addr": self.control_addr,
|
"control_addr": self.control_addr,
|
||||||
"player_addr": self.player_addr,
|
"player_addr": self.player_addr,
|
||||||
|
"r2r_addr": self.r2r_addr,
|
||||||
"updated_at": chrono::Utc::now().timestamp(),
|
"updated_at": chrono::Utc::now().timestamp(),
|
||||||
})
|
})
|
||||||
.to_string();
|
.to_string();
|
||||||
@@ -248,6 +290,26 @@ impl RedisRegistry {
|
|||||||
}
|
}
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn lookup_tunnel(&self, fqdn: &str) -> Option<TunnelRouteRecord> {
|
||||||
|
let Some(mut conn) = self.conn.clone() else {
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
let key = format!("tunnel:sub:{fqdn}");
|
||||||
|
let raw: Option<String> = conn.get(key).await.ok()?;
|
||||||
|
let raw = raw?;
|
||||||
|
serde_json::from_str(&raw).ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn lookup_instance(&self, instance_id: &str) -> Option<RelayInstanceRecord> {
|
||||||
|
let Some(mut conn) = self.conn.clone() else {
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
let key = format!("relay:instance:{instance_id}");
|
||||||
|
let raw: Option<String> = conn.get(key).await.ok()?;
|
||||||
|
let raw = raw?;
|
||||||
|
serde_json::from_str(&raw).ok()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@@ -269,17 +331,24 @@ async fn main() -> Result<()> {
|
|||||||
let player_listener = TcpListener::bind(&cfg.player_bind)
|
let player_listener = TcpListener::bind(&cfg.player_bind)
|
||||||
.await
|
.await
|
||||||
.with_context(|| format!("bind player {}", cfg.player_bind))?;
|
.with_context(|| format!("bind player {}", cfg.player_bind))?;
|
||||||
|
let r2r_listener = TcpListener::bind(&cfg.r2r_bind)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("bind r2r {}", cfg.r2r_bind))?;
|
||||||
|
|
||||||
info!(instance_id = %cfg.instance_id, region = %cfg.region, "relay started");
|
info!(
|
||||||
|
instance_id = %cfg.instance_id,
|
||||||
|
region = %cfg.region,
|
||||||
|
control = %cfg.control_bind,
|
||||||
|
player = %cfg.player_bind,
|
||||||
|
r2r = %cfg.r2r_bind,
|
||||||
|
r2r_advertise = %cfg.r2r_advertise_addr,
|
||||||
|
"relay started"
|
||||||
|
);
|
||||||
|
|
||||||
let shutdown = Arc::new(Notify::new());
|
let shutdown = Arc::new(Notify::new());
|
||||||
let state: SharedState = Arc::new(RwLock::new(RelayState::new()));
|
let state: SharedState = Arc::new(RwLock::new(RelayState::new()));
|
||||||
|
|
||||||
let heartbeat_task = tokio::spawn(run_registry_heartbeat(
|
let heartbeat_task = tokio::spawn(run_registry_heartbeat(state.clone(), registry.clone(), shutdown.clone()));
|
||||||
state.clone(),
|
|
||||||
registry.clone(),
|
|
||||||
shutdown.clone(),
|
|
||||||
));
|
|
||||||
let control_task = tokio::spawn(run_control_accept_loop(
|
let control_task = tokio::spawn(run_control_accept_loop(
|
||||||
control_listener,
|
control_listener,
|
||||||
cfg.clone(),
|
cfg.clone(),
|
||||||
@@ -289,17 +358,28 @@ async fn main() -> Result<()> {
|
|||||||
));
|
));
|
||||||
let player_task = tokio::spawn(run_player_accept_loop(
|
let player_task = tokio::spawn(run_player_accept_loop(
|
||||||
player_listener,
|
player_listener,
|
||||||
|
cfg.clone(),
|
||||||
|
state.clone(),
|
||||||
|
registry.clone(),
|
||||||
|
shutdown.clone(),
|
||||||
|
));
|
||||||
|
let r2r_task = tokio::spawn(run_r2r_accept_loop(
|
||||||
|
r2r_listener,
|
||||||
|
cfg.clone(),
|
||||||
state.clone(),
|
state.clone(),
|
||||||
shutdown.clone(),
|
shutdown.clone(),
|
||||||
));
|
));
|
||||||
|
|
||||||
tokio::pin!(heartbeat_task);
|
tokio::pin!(heartbeat_task);
|
||||||
tokio::pin!(control_task);
|
tokio::pin!(control_task);
|
||||||
tokio::pin!(player_task);
|
tokio::pin!(player_task);
|
||||||
|
tokio::pin!(r2r_task);
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = tokio::signal::ctrl_c() => info!("shutdown signal received"),
|
_ = tokio::signal::ctrl_c() => info!("shutdown signal received"),
|
||||||
res = &mut control_task => warn!("control accept loop ended: {:?}", res),
|
res = &mut control_task => warn!("control accept loop ended: {:?}", res),
|
||||||
res = &mut player_task => warn!("player accept loop ended: {:?}", res),
|
res = &mut player_task => warn!("player accept loop ended: {:?}", res),
|
||||||
|
res = &mut r2r_task => warn!("r2r accept loop ended: {:?}", res),
|
||||||
res = &mut heartbeat_task => warn!("registry heartbeat task ended: {:?}", res),
|
res = &mut heartbeat_task => warn!("registry heartbeat task ended: {:?}", res),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -355,7 +435,9 @@ async fn run_control_accept_loop(
|
|||||||
|
|
||||||
async fn run_player_accept_loop(
|
async fn run_player_accept_loop(
|
||||||
listener: TcpListener,
|
listener: TcpListener,
|
||||||
|
cfg: RelayConfig,
|
||||||
state: SharedState,
|
state: SharedState,
|
||||||
|
registry: RedisRegistry,
|
||||||
shutdown: Arc<Notify>,
|
shutdown: Arc<Notify>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
loop {
|
loop {
|
||||||
@@ -367,8 +449,10 @@ async fn run_player_accept_loop(
|
|||||||
Err(e) => { warn!(error = %e, "player accept failed"); continue; }
|
Err(e) => { warn!(error = %e, "player accept failed"); continue; }
|
||||||
};
|
};
|
||||||
let state = state.clone();
|
let state = state.clone();
|
||||||
|
let registry = registry.clone();
|
||||||
|
let cfg = cfg.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = handle_player_conn(stream, addr, state).await {
|
if let Err(e) = handle_player_conn(stream, addr, cfg, state, registry).await {
|
||||||
debug!(peer = %addr, error = %e, "player connection closed");
|
debug!(peer = %addr, error = %e, "player connection closed");
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -378,6 +462,33 @@ async fn run_player_accept_loop(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn run_r2r_accept_loop(
|
||||||
|
listener: TcpListener,
|
||||||
|
cfg: RelayConfig,
|
||||||
|
state: SharedState,
|
||||||
|
shutdown: Arc<Notify>,
|
||||||
|
) -> Result<()> {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown.notified() => break,
|
||||||
|
res = listener.accept() => {
|
||||||
|
let (stream, addr) = match res {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(e) => { warn!(error = %e, "r2r accept failed"); continue; }
|
||||||
|
};
|
||||||
|
let state = state.clone();
|
||||||
|
let cfg = cfg.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = handle_r2r_conn(stream, addr, cfg, state).await {
|
||||||
|
warn!(peer = %addr, error = %e, "r2r connection ended with error");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_control_conn(
|
async fn handle_control_conn(
|
||||||
stream: TcpStream,
|
stream: TcpStream,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
@@ -518,22 +629,134 @@ async fn control_read_loop(
|
|||||||
async fn handle_player_conn(
|
async fn handle_player_conn(
|
||||||
mut stream: TcpStream,
|
mut stream: TcpStream,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
|
cfg: RelayConfig,
|
||||||
state: SharedState,
|
state: SharedState,
|
||||||
|
registry: RedisRegistry,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let (hostname, initial_data) = read_handshake_hostname_and_bytes(&mut stream)
|
let (hostname, initial_data) = read_handshake_hostname_and_bytes(&mut stream)
|
||||||
.await
|
.await
|
||||||
.context("parse minecraft handshake")?;
|
.context("parse minecraft handshake")?;
|
||||||
|
|
||||||
let session = {
|
if let Some(session) = local_session_for_hostname(&state, &hostname).await {
|
||||||
let guard = state.read().await;
|
return attach_player_socket_to_session(
|
||||||
guard.by_fqdn.get(&hostname).cloned()
|
stream,
|
||||||
};
|
session,
|
||||||
let Some(session) = session else {
|
hostname,
|
||||||
debug!(peer = %addr, hostname = %hostname, "no tunnel for hostname");
|
addr.to_string(),
|
||||||
|
initial_data,
|
||||||
|
None,
|
||||||
|
"direct",
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(route) = registry.lookup_tunnel(&hostname).await {
|
||||||
|
if route.instance_id == cfg.instance_id {
|
||||||
|
debug!(peer = %addr, hostname = %hostname, session_id = %route.session_id, "route points to self but local session missing");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
}
|
||||||
|
return proxy_player_to_owner(stream, addr, hostname, initial_data, route, cfg, registry).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!(peer = %addr, hostname = %hostname, "no tunnel for hostname");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_r2r_conn(
|
||||||
|
mut stream: TcpStream,
|
||||||
|
addr: SocketAddr,
|
||||||
|
_cfg: RelayConfig,
|
||||||
|
state: SharedState,
|
||||||
|
) -> Result<()> {
|
||||||
|
let prelude: RelayForwardPrelude = read_frame(&mut stream).await.context("read r2r prelude")?;
|
||||||
|
if prelude.version != 1 {
|
||||||
|
anyhow::bail!("unsupported r2r prelude version {}", prelude.version);
|
||||||
|
}
|
||||||
|
if prelude.hop_count > 1 {
|
||||||
|
anyhow::bail!("invalid hop_count {}", prelude.hop_count);
|
||||||
|
}
|
||||||
|
|
||||||
|
let session = local_session_for_session_id(&state, &prelude.session_id).await;
|
||||||
|
let Some(session) = session else {
|
||||||
|
anyhow::bail!("owner session not found for {}", prelude.session_id);
|
||||||
};
|
};
|
||||||
|
|
||||||
let stream_id = Uuid::new_v4().to_string();
|
attach_player_socket_to_session(
|
||||||
|
stream,
|
||||||
|
session,
|
||||||
|
prelude.fqdn.clone(),
|
||||||
|
prelude.peer_addr,
|
||||||
|
prelude.initial_data,
|
||||||
|
Some(prelude.stream_id),
|
||||||
|
"r2r",
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("r2r attach failed from {addr}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn proxy_player_to_owner(
|
||||||
|
mut player_stream: TcpStream,
|
||||||
|
player_addr: SocketAddr,
|
||||||
|
hostname: String,
|
||||||
|
initial_data: Vec<u8>,
|
||||||
|
route: TunnelRouteRecord,
|
||||||
|
cfg: RelayConfig,
|
||||||
|
registry: RedisRegistry,
|
||||||
|
) -> Result<()> {
|
||||||
|
let owner = registry
|
||||||
|
.lookup_instance(&route.instance_id)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("owner instance {} not found in redis", route.instance_id))?;
|
||||||
|
if owner.status.as_deref() == Some("draining") {
|
||||||
|
debug!(owner = %route.instance_id, hostname = %hostname, "owner draining; attempting forward anyway");
|
||||||
|
}
|
||||||
|
let r2r_addr = owner
|
||||||
|
.r2r_addr
|
||||||
|
.clone()
|
||||||
|
.with_context(|| format!("owner {} missing r2r_addr", route.instance_id))?;
|
||||||
|
|
||||||
|
let mut owner_stream = timeout(cfg.r2r_connect_timeout, TcpStream::connect(&r2r_addr))
|
||||||
|
.await
|
||||||
|
.context("r2r connect timeout")??;
|
||||||
|
|
||||||
|
let prelude = RelayForwardPrelude {
|
||||||
|
version: 1,
|
||||||
|
session_id: route.session_id,
|
||||||
|
fqdn: hostname.clone(),
|
||||||
|
stream_id: Uuid::new_v4().to_string(),
|
||||||
|
peer_addr: player_addr.to_string(),
|
||||||
|
origin_instance_id: cfg.instance_id.clone(),
|
||||||
|
hop_count: 1,
|
||||||
|
initial_data,
|
||||||
|
};
|
||||||
|
write_frame(&mut owner_stream, &prelude).await?;
|
||||||
|
|
||||||
|
let _ = copy_bidirectional(&mut player_stream, &mut owner_stream).await?;
|
||||||
|
info!(peer = %player_addr, hostname = %hostname, owner = %route.instance_id, "proxied player connection to owner relay");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn local_session_for_hostname(state: &SharedState, hostname: &str) -> Option<SessionHandle> {
|
||||||
|
let guard = state.read().await;
|
||||||
|
guard.by_fqdn.get(hostname).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn local_session_for_session_id(state: &SharedState, session_id: &str) -> Option<SessionHandle> {
|
||||||
|
let guard = state.read().await;
|
||||||
|
let fqdn = guard.by_session.get(session_id)?.clone();
|
||||||
|
guard.by_fqdn.get(&fqdn).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn attach_player_socket_to_session(
|
||||||
|
stream: TcpStream,
|
||||||
|
session: SessionHandle,
|
||||||
|
hostname: String,
|
||||||
|
peer_addr: String,
|
||||||
|
initial_data: Vec<u8>,
|
||||||
|
stream_id_override: Option<String>,
|
||||||
|
source: &'static str,
|
||||||
|
) -> Result<()> {
|
||||||
|
let stream_id = stream_id_override.unwrap_or_else(|| Uuid::new_v4().to_string());
|
||||||
let (player_read, player_write) = stream.into_split();
|
let (player_read, player_write) = stream.into_split();
|
||||||
let (to_player_tx, to_player_rx) = mpsc::channel::<Vec<u8>>(128);
|
let (to_player_tx, to_player_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||||
session
|
session
|
||||||
@@ -547,7 +770,7 @@ async fn handle_player_conn(
|
|||||||
.send(ServerFrame::IncomingTcp(IncomingTcp {
|
.send(ServerFrame::IncomingTcp(IncomingTcp {
|
||||||
stream_id: stream_id.clone(),
|
stream_id: stream_id.clone(),
|
||||||
session_id: session.session_id.clone(),
|
session_id: session.session_id.clone(),
|
||||||
peer_addr: addr.to_string(),
|
peer_addr: peer_addr.clone(),
|
||||||
hostname: hostname.clone(),
|
hostname: hostname.clone(),
|
||||||
initial_data,
|
initial_data,
|
||||||
}))
|
}))
|
||||||
@@ -556,7 +779,6 @@ async fn handle_player_conn(
|
|||||||
|
|
||||||
let tx_control = session.tx.clone();
|
let tx_control = session.tx.clone();
|
||||||
let stream_id_clone = stream_id.clone();
|
let stream_id_clone = stream_id.clone();
|
||||||
let session_id_clone = session.session_id.clone();
|
|
||||||
let sinks = session.stream_sinks.clone();
|
let sinks = session.stream_sinks.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = run_player_writer(player_write, to_player_rx).await {
|
if let Err(e) = run_player_writer(player_write, to_player_rx).await {
|
||||||
@@ -569,7 +791,6 @@ async fn handle_player_conn(
|
|||||||
}))
|
}))
|
||||||
.await;
|
.await;
|
||||||
let _ = remove_stream_sink_by_store(sinks, &stream_id_clone).await;
|
let _ = remove_stream_sink_by_store(sinks, &stream_id_clone).await;
|
||||||
let _ = session_id_clone;
|
|
||||||
});
|
});
|
||||||
|
|
||||||
let tx_control = session.tx.clone();
|
let tx_control = session.tx.clone();
|
||||||
@@ -588,7 +809,7 @@ async fn handle_player_conn(
|
|||||||
let _ = remove_stream_sink_by_store(sinks, &stream_id_clone).await;
|
let _ = remove_stream_sink_by_store(sinks, &stream_id_clone).await;
|
||||||
});
|
});
|
||||||
|
|
||||||
info!(peer = %addr, hostname = %hostname, session_id = %session.session_id, stream_id = %stream_id, "player proxied via client stream");
|
info!(peer = %peer_addr, hostname = %hostname, session_id = %session.session_id, stream_id = %stream_id, source, "player proxied via client stream");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -621,7 +842,7 @@ async fn run_player_writer(
|
|||||||
while let Some(chunk) = rx.recv().await {
|
while let Some(chunk) = rx.recv().await {
|
||||||
writer.write_all(&chunk).await?;
|
writer.write_all(&chunk).await?;
|
||||||
}
|
}
|
||||||
writer.shutdown().await.ok();
|
let _ = writer.shutdown().await;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -641,12 +862,8 @@ async fn lookup_stream_sink(
|
|||||||
async fn remove_stream_sink(state: &SharedState, session_id: &str, stream_id: &str) {
|
async fn remove_stream_sink(state: &SharedState, session_id: &str, stream_id: &str) {
|
||||||
let store = {
|
let store = {
|
||||||
let guard = state.read().await;
|
let guard = state.read().await;
|
||||||
let Some(fqdn) = guard.by_session.get(session_id).cloned() else {
|
let Some(fqdn) = guard.by_session.get(session_id).cloned() else { return; };
|
||||||
return;
|
let Some(handle) = guard.by_fqdn.get(&fqdn) else { return; };
|
||||||
};
|
|
||||||
let Some(handle) = guard.by_fqdn.get(&fqdn) else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
handle.stream_sinks.clone()
|
handle.stream_sinks.clone()
|
||||||
};
|
};
|
||||||
let _ = remove_stream_sink_by_store(store, stream_id).await;
|
let _ = remove_stream_sink_by_store(store, stream_id).await;
|
||||||
@@ -697,3 +914,11 @@ fn sanitize_label(input: &str) -> String {
|
|||||||
.trim_matches('-')
|
.trim_matches('-')
|
||||||
.to_ascii_lowercase()
|
.to_ascii_lowercase()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn guess_advertise_addr(bind: &str) -> String {
|
||||||
|
if let Some((_host, port)) = bind.rsplit_once(':') {
|
||||||
|
format!("127.0.0.1:{port}")
|
||||||
|
} else {
|
||||||
|
"127.0.0.1:7001".to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user