feat: scaffold relay client auth workspace

This commit is contained in:
L
2026-02-23 23:18:56 +00:00
parent b4942e4ab1
commit 050dbc792a
18 changed files with 3724 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
target/
.DS_Store
.env
.env.*
!.env.example

1798
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

21
Cargo.toml Normal file
View File

@@ -0,0 +1,21 @@
[workspace]
members = ["common", "relay", "client", "auth-api"]
resolver = "2"
[workspace.package]
edition = "2024"
[workspace.dependencies]
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.44", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
uuid = { version = "1.16", features = ["v4", "serde"] }
thiserror = "2.0"
fastrand = "2.3"
axum = "0.8"
redis = { version = "0.32", features = ["tokio-comp", "connection-manager"] }
jsonwebtoken = "10"
chrono = { version = "0.4", features = ["serde", "clock"] }

12
Dockerfile.auth-api Normal file
View File

@@ -0,0 +1,12 @@
FROM rust:1.86 as build
WORKDIR /app
COPY . .
RUN cargo build --release -p auth-api
FROM debian:bookworm-slim
RUN useradd -r -u 10001 appuser
WORKDIR /app
COPY --from=build /app/target/release/auth-api /usr/local/bin/auth-api
USER appuser
EXPOSE 8080
CMD ["auth-api"]

311
MASTER_PROMPT.md Normal file
View File

@@ -0,0 +1,311 @@
# 🧠 MASTER SYSTEM PROMPT
## Distributed Minecraft Tunnel Platform (Railway-Scalable, Replica-Aware, Monetizable)
---
## ROLE
You are a senior distributed systems architect and Rust network engineer.
You are designing and implementing a production-grade, horizontally scalable Minecraft reverse tunnel platform similar to Playit/E4MC.
The system must:
* Scale from 1k → 50k+ concurrent tunnels
* Support multi-region
* Support Railway auto-scaling
* Be replica-safe
* Be stateless at relay layer
* Be monetization-ready
* Be resilient to instance restarts
* Maintain low latency
* Be designed for high concurrency
This is not a hobby project.
Design for real-world production constraints.
---
# 🎯 SYSTEM OBJECTIVE
Users run a Rust client locally.
They receive a subdomain:
```
sleepy-creeper.eu.dvv.one
```
Players connect to that subdomain.
Traffic is routed through relay nodes to the users home server.
No port forwarding required.
---
# 🏗 GLOBAL ARCHITECTURE REQUIREMENTS
You must design:
1. Rust Relay Service (stateless)
2. Rust Client Agent
3. Redis cluster (shared tunnel state)
4. PostgreSQL (users + billing)
5. Auth API (minimal Rust HTTP service)
6. Stripe billing integration
7. Railway deployment topology
8. Multi-region support
9. Replica-aware routing
10. Graceful autoscaling logic
---
# 🌍 MULTI-REGION MODEL
Regions:
```
eu
us
asia (optional)
```
Each region:
* Has its own relay cluster
* Has its own Railway service group
* Shares central Redis + Postgres (or region-local Redis if specified)
Subdomain format:
```
adjective-noun.{region}.dvv.one
```
---
# 🧱 RELAY SERVICE REQUIREMENTS (RUST)
Relay must:
* Listen on:
* 7000 (tunnel registration)
* 25565 (Minecraft routing)
* Be async (Tokio)
* Be horizontally scalable
* NOT store tunnel state locally
---
## 🔁 TUNNEL STATE MODEL (REDIS)
Redis stores:
```
subdomain → instance_id
instance_id → active tunnel metadata
user_id → plan tier
```
Relay instances must:
* Register themselves in Redis on startup
* Maintain heartbeat key:
```
instance:{id} → alive
```
* Remove stale instance data on startup
---
## 🔄 REPLICA SUPPORT
If multiple relay replicas are running:
* Any relay can receive player connection
* It checks Redis:
```
subdomain → owning instance
```
If tunnel owned by different instance:
* Open internal TCP connection to that instance
* Forward traffic internally
* Do NOT reject connection
Cross-instance routing must:
* Be efficient
* Add minimal latency
* Avoid infinite routing loops
* Avoid recursive forwarding
---
# 🚂 RAILWAY COMPATIBILITY REQUIREMENTS
You MUST design for:
* Ephemeral containers
* No persistent disk
* Dynamic scaling up/down
* Instance restarts at any time
* Non-static internal IPs
Therefore:
* All state in Redis
* No reliance on local memory persistence
* Graceful shutdown support (SIGTERM handler)
* Drain active tunnels before shutdown
---
# 📈 AUTOSCALING LOGIC
When scaling up:
* New instances register in Redis
* New tunnel registrations assigned to least-loaded instance
* Use Redis sorted set for load tracking
When scaling down:
* Instance marks itself as “draining”
* Stops accepting new tunnels
* Waits until active tunnels reach 0
* Exits cleanly
---
# 🧠 CLIENT REQUIREMENTS (RUST)
Client must:
* Benchmark latency to all regions
* Connect to lowest-latency region
* Authenticate using token
* Maintain persistent tunnel
* Reconnect automatically
* Handle relay instance migration
* Support multiple concurrent forwarded connections
* Be under 10MB binary
---
# 💰 SAAS & MONETIZATION LAYER
Free tier:
* 1 tunnel
* Random subdomain
* Idle timeout
* Limited bandwidth
Paid tier:
* Custom subdomain
* Multiple tunnels
* Higher bandwidth cap
* No idle timeout
* Priority routing
---
## 🔐 AUTH MODEL
* JWT tokens
* Client must send token during registration
* Relay verifies token via Redis cache
* Plan limits enforced in real time
---
# 🧮 SCALING TARGETS
Design must handle:
| Level | Active Tunnels | Player Connections |
| ------ | -------------- | ------------------ |
| Small | 1,000 | 5,000 |
| Medium | 10,000 | 50,000 |
| Large | 50,000 | 200,000 |
Include:
* Memory usage estimates
* CPU usage estimates
* Redis throughput estimates
* Bandwidth estimates
---
# ⚡ PERFORMANCE CONSTRAINTS
* No blocking I/O
* No global Mutex<HashMap>
* No heavy frameworks in relay
* Zero-copy forwarding
* Use `tokio::io::copy_bidirectional`
* Minimal allocations in hot path
* Avoid logging per packet
---
# 🛡 FAILURE SCENARIOS TO HANDLE
* Relay instance crash
* Redis temporary outage
* Client reconnect storm
* Player connects during migration
* Instance scaling down mid-traffic
* Partial network partition
Explain how each is handled safely.
---
# 📦 REQUIRED OUTPUT FORMAT
Return:
1. Full system architecture diagram (text-based)
2. Component descriptions
3. Redis schema
4. Cross-instance routing design
5. Autoscaling algorithm
6. Graceful shutdown flow
7. Client architecture
8. SaaS enforcement model
9. Railway deployment layout
10. Cost estimates at 1k / 10k / 50k scale
11. Bottleneck analysis
12. Optimization opportunities
Do NOT provide vague explanations.
Be concrete, structured, and technical.
---
# 🔥 ADVANCED MODE (MANDATORY)
Include:
* Replica-aware routing logic
* Load-based tunnel assignment
* Rate limiting strategy
* DDoS mitigation outline
* Observability plan (metrics + tracing)
* Suggested Prometheus metrics
* Suggested horizontal scaling thresholds
---
# 🚀 DESIGN PRIORITY ORDER
1. Correctness
2. Scalability
3. Low latency
4. Railway compatibility
5. Monetization enforcement
6. Cost efficiency
---
This single prompt forces an AI agent to produce a **real distributed system blueprint**, not toy code.
---
If you want next, I can:
* Design the *ultra-low-latency no-Redis single-region version*
* Or design the *global Anycast + BGP version like major networks*
* Or help you decide whether Railway is actually the right choice for 50k tunnels*
Youre now at infrastructure-architect level thinking.

17
auth-api/Cargo.toml Normal file
View File

@@ -0,0 +1,17 @@
[package]
name = "auth-api"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow.workspace = true
axum.workspace = true
chrono.workspace = true
jsonwebtoken.workspace = true
redis.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
fastrand.workspace = true

250
auth-api/src/main.rs Normal file
View File

@@ -0,0 +1,250 @@
use std::{net::SocketAddr, sync::Arc};
use anyhow::{Context, Result};
use axum::{
Json, Router,
extract::State,
http::StatusCode,
response::IntoResponse,
routing::{get, post},
};
use chrono::{Duration, Utc};
use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, Validation, decode, encode};
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
#[derive(Clone)]
struct AppState {
jwt_secret: Arc<String>,
redis: Option<redis::aio::ConnectionManager>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Claims {
sub: String,
tier: String,
max_tunnels: u32,
exp: usize,
iat: usize,
jti: String,
}
#[derive(Debug, Deserialize)]
struct DevTokenRequest {
user_id: String,
#[serde(default = "default_tier")]
tier: String,
#[serde(default = "default_max_tunnels")]
max_tunnels: u32,
}
#[derive(Debug, Serialize)]
struct TokenResponse {
token: String,
expires_at: i64,
}
#[derive(Debug, Deserialize)]
struct ValidateRequest {
token: String,
}
#[derive(Debug, Serialize)]
struct ValidateResponse {
valid: bool,
user_id: Option<String>,
tier: Option<String>,
max_tunnels: Option<u32>,
}
#[derive(Debug, Deserialize)]
struct StripeWebhookEvent {
event_type: String,
user_id: String,
tier: String,
#[serde(default = "default_max_tunnels")]
max_tunnels: u32,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "auth_api=info".into()),
)
.init();
let bind = std::env::var("AUTH_BIND").unwrap_or_else(|_| "0.0.0.0:8080".into());
let jwt_secret = std::env::var("JWT_SECRET").unwrap_or_else(|_| "dev-secret-change-me".into());
let redis = if let Ok(url) = std::env::var("REDIS_URL") {
let client = redis::Client::open(url.clone()).context("open redis client")?;
match redis::aio::ConnectionManager::new(client).await {
Ok(conn) => {
info!("auth-api connected to redis");
Some(conn)
}
Err(e) => {
warn!(error = %e, "auth-api redis unavailable; continuing without cache");
None
}
}
} else {
None
};
let state = AppState {
jwt_secret: Arc::new(jwt_secret),
redis,
};
let app = Router::new()
.route("/healthz", get(healthz))
.route("/v1/token/dev", post(issue_dev_token))
.route("/v1/token/validate", post(validate_token))
.route("/v1/stripe/webhook", post(stripe_webhook))
.with_state(state);
let listener = tokio::net::TcpListener::bind(&bind)
.await
.with_context(|| format!("bind {bind}"))?;
let local_addr: SocketAddr = listener.local_addr()?;
info!(addr = %local_addr, "auth-api listening");
axum::serve(listener, app).await?;
Ok(())
}
async fn healthz() -> &'static str {
"ok"
}
async fn issue_dev_token(
State(state): State<AppState>,
Json(req): Json<DevTokenRequest>,
) -> Result<Json<TokenResponse>, ApiError> {
let now = Utc::now();
let exp = now + Duration::hours(24);
let claims = Claims {
sub: req.user_id,
tier: req.tier,
max_tunnels: req.max_tunnels,
exp: exp.timestamp() as usize,
iat: now.timestamp() as usize,
jti: format!("jti-{}", fastrand::u64(..)),
};
let token = encode(
&Header::default(),
&claims,
&EncodingKey::from_secret(state.jwt_secret.as_bytes()),
)
.map_err(ApiError::internal)?;
if let Some(mut redis) = state.redis.clone() {
let key = format!("auth:jwt:jti:{}", claims.jti);
let ttl = (claims.exp as i64 - now.timestamp()).max(1);
let payload = serde_json::json!({
"user_id": claims.sub,
"plan_tier": claims.tier,
"max_tunnels": claims.max_tunnels
})
.to_string();
let _: () = redis
.set_ex(key, payload, ttl as u64)
.await
.map_err(ApiError::internal)?;
}
Ok(Json(TokenResponse {
token,
expires_at: exp.timestamp(),
}))
}
async fn validate_token(
State(state): State<AppState>,
Json(req): Json<ValidateRequest>,
) -> Result<Json<ValidateResponse>, ApiError> {
let decoded = decode::<Claims>(
&req.token,
&DecodingKey::from_secret(state.jwt_secret.as_bytes()),
&Validation::new(Algorithm::HS256),
);
match decoded {
Ok(tok) => {
let c = tok.claims;
if let Some(mut redis) = state.redis.clone() {
let key = format!("plan:user:{}", c.sub);
let payload = serde_json::json!({
"tier": c.tier,
"max_tunnels": c.max_tunnels,
"source": "auth-api"
})
.to_string();
let _: () = redis.set_ex(key, payload, 300).await.map_err(ApiError::internal)?;
}
Ok(Json(ValidateResponse {
valid: true,
user_id: Some(c.sub),
tier: Some(c.tier),
max_tunnels: Some(c.max_tunnels),
}))
}
Err(_) => Ok(Json(ValidateResponse {
valid: false,
user_id: None,
tier: None,
max_tunnels: None,
})),
}
}
async fn stripe_webhook(
State(state): State<AppState>,
Json(event): Json<StripeWebhookEvent>,
) -> Result<impl IntoResponse, ApiError> {
if let Some(mut redis) = state.redis.clone() {
let key = format!("plan:user:{}", event.user_id);
let payload = serde_json::json!({
"tier": event.tier,
"max_tunnels": event.max_tunnels,
"source": "stripe_webhook",
"last_event_type": event.event_type,
"updated_at": Utc::now().timestamp(),
})
.to_string();
let _: () = redis.set_ex(key, payload, 300).await.map_err(ApiError::internal)?;
}
Ok(StatusCode::NO_CONTENT)
}
#[derive(Debug)]
struct ApiError {
status: StatusCode,
message: String,
}
impl ApiError {
fn internal<E: std::fmt::Display>(e: E) -> Self {
Self {
status: StatusCode::INTERNAL_SERVER_ERROR,
message: e.to_string(),
}
}
}
impl IntoResponse for ApiError {
fn into_response(self) -> axum::response::Response {
(self.status, self.message).into_response()
}
}
fn default_tier() -> String {
"free".to_string()
}
fn default_max_tunnels() -> u32 {
1
}

11
client/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "client"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
common = { path = "../common" }

254
client/src/main.rs Normal file
View File

@@ -0,0 +1,254 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use anyhow::{Context, Result};
use common::{
codec::{read_frame, write_frame},
protocol::{
ClientFrame, Heartbeat, RegisterRequest, ServerFrame, StreamClosed, StreamData,
},
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
sync::{RwLock, mpsc},
time::{MissedTickBehavior, sleep},
};
use tracing::{error, info, warn};
#[derive(Clone)]
struct ClientConfig {
relay_addr: String,
token: String,
region: String,
local_addr: String,
requested_subdomain: Option<String>,
}
impl ClientConfig {
fn from_env() -> Self {
Self {
relay_addr: std::env::var("DVV_RELAY_ADDR").unwrap_or_else(|_| "127.0.0.1:7000".into()),
token: std::env::var("DVV_TOKEN").unwrap_or_else(|_| "dev-token-local".into()),
region: std::env::var("DVV_REGION").unwrap_or_else(|_| "eu".into()),
local_addr: std::env::var("DVV_LOCAL_ADDR").unwrap_or_else(|_| "127.0.0.1:25565".into()),
requested_subdomain: std::env::var("DVV_SUBDOMAIN").ok(),
}
}
}
type StreamSinkMap = Arc<RwLock<HashMap<String, mpsc::Sender<Vec<u8>>>>>;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "client=info".into()),
)
.init();
let cfg = ClientConfig::from_env();
let mut backoff = Duration::from_millis(300);
loop {
match run_session(cfg.clone()).await {
Ok(()) => warn!("session ended; reconnecting"),
Err(e) => error!(error = %e, "session failed"),
}
sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(10));
}
}
async fn run_session(cfg: ClientConfig) -> Result<()> {
let stream = TcpStream::connect(&cfg.relay_addr)
.await
.with_context(|| format!("connect relay {}", cfg.relay_addr))?;
let (mut reader, mut writer) = stream.into_split();
write_frame(
&mut writer,
&ClientFrame::Register(RegisterRequest {
token: cfg.token.clone(),
region: cfg.region.clone(),
requested_subdomain: cfg.requested_subdomain.clone(),
local_addr: cfg.local_addr.clone(),
}),
)
.await?;
let accepted = match read_frame::<_, ServerFrame>(&mut reader)
.await
.context("read register response")?
{
ServerFrame::RegisterAccepted(ok) => ok,
ServerFrame::RegisterRejected { reason } => anyhow::bail!("register rejected: {reason}"),
other => anyhow::bail!("unexpected frame at register: {other:?}"),
};
info!(
fqdn = %accepted.fqdn,
session_id = %accepted.session_id,
owner = %accepted.owner_instance_id,
"registered tunnel"
);
let sinks: StreamSinkMap = Arc::new(RwLock::new(HashMap::new()));
let (out_tx, mut out_rx) = mpsc::channel::<ClientFrame>(1024);
let writer_task = tokio::spawn(async move {
while let Some(frame) = out_rx.recv().await {
write_frame(&mut writer, &frame).await?;
}
Ok::<(), anyhow::Error>(())
});
let hb_tx = out_tx.clone();
let hb_sinks = sinks.clone();
let hb_session_id = accepted.session_id.clone();
let hb_interval = Duration::from_secs(accepted.heartbeat_interval_secs.max(1));
let hb_task = tokio::spawn(async move {
let mut ticker = tokio::time::interval(hb_interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
ticker.tick().await;
let active_streams = hb_sinks.read().await.len() as u32;
let frame = ClientFrame::Heartbeat(Heartbeat {
session_id: hb_session_id.clone(),
active_streams,
bytes_in: 0,
bytes_out: 0,
});
if hb_tx.send(frame).await.is_err() {
break;
}
}
});
loop {
let frame: ServerFrame = read_frame(&mut reader).await?;
match frame {
ServerFrame::Ping => {
let _ = out_tx.send(ClientFrame::Pong).await;
}
ServerFrame::IncomingTcp(incoming) => {
let cfg_clone = cfg.clone();
let out_tx_clone = out_tx.clone();
let sinks_clone = sinks.clone();
tokio::spawn(async move {
if let Err(e) = handle_incoming_stream(cfg_clone, incoming, out_tx_clone, sinks_clone).await {
warn!(error = %e, "incoming stream handling failed");
}
});
}
ServerFrame::StreamData(StreamData { stream_id, data }) => {
if let Some(tx) = sinks.read().await.get(&stream_id).cloned() {
if tx.send(data).await.is_err() {
sinks.write().await.remove(&stream_id);
}
}
}
ServerFrame::StreamClosed(StreamClosed { stream_id, .. }) => {
sinks.write().await.remove(&stream_id);
}
ServerFrame::DrainNotice { retry_after_ms, reason } => {
warn!(retry_after_ms, reason = %reason, "relay draining");
break;
}
ServerFrame::Error { message } => warn!(message = %message, "server error frame"),
ServerFrame::RegisterAccepted(_) | ServerFrame::RegisterRejected { .. } => {
warn!("ignoring unexpected register response after session start")
}
}
}
hb_task.abort();
writer_task.abort();
Ok(())
}
async fn handle_incoming_stream(
cfg: ClientConfig,
incoming: common::protocol::IncomingTcp,
out_tx: mpsc::Sender<ClientFrame>,
sinks: StreamSinkMap,
) -> Result<()> {
let mut local = TcpStream::connect(&cfg.local_addr)
.await
.with_context(|| format!("connect local mc {}", cfg.local_addr))?;
if !incoming.initial_data.is_empty() {
local.write_all(&incoming.initial_data).await?;
}
let (local_read, local_write) = local.into_split();
let (to_local_tx, to_local_rx) = mpsc::channel::<Vec<u8>>(128);
sinks.write().await.insert(incoming.stream_id.clone(), to_local_tx);
let stream_id = incoming.stream_id.clone();
let sinks_clone = sinks.clone();
tokio::spawn(async move {
if let Err(e) = run_local_writer(local_write, to_local_rx).await {
warn!(stream_id = %stream_id, error = %e, "local writer ended");
}
sinks_clone.write().await.remove(&stream_id);
});
let stream_id = incoming.stream_id.clone();
let out_tx_clone = out_tx.clone();
let sinks_clone = sinks.clone();
tokio::spawn(async move {
if let Err(e) = run_local_reader(local_read, out_tx_clone.clone(), stream_id.clone()).await {
warn!(stream_id = %stream_id, error = %e, "local reader ended");
}
let _ = out_tx_clone
.send(ClientFrame::StreamClosed(StreamClosed {
stream_id: stream_id.clone(),
reason: Some("local_reader_closed".into()),
}))
.await;
sinks_clone.write().await.remove(&stream_id);
});
info!(
stream_id = %incoming.stream_id,
peer = %incoming.peer_addr,
hostname = %incoming.hostname,
local = %cfg.local_addr,
"connected relay stream to local minecraft"
);
Ok(())
}
async fn run_local_reader(
mut reader: tokio::net::tcp::OwnedReadHalf,
out_tx: mpsc::Sender<ClientFrame>,
stream_id: String,
) -> Result<()> {
let mut buf = vec![0u8; 16 * 1024];
loop {
let n = reader.read(&mut buf).await?;
if n == 0 {
break;
}
out_tx
.send(ClientFrame::StreamData(StreamData {
stream_id: stream_id.clone(),
data: buf[..n].to_vec(),
}))
.await
.context("send local data to relay")?;
}
Ok(())
}
async fn run_local_writer(
mut writer: tokio::net::tcp::OwnedWriteHalf,
mut rx: mpsc::Receiver<Vec<u8>>,
) -> Result<()> {
while let Some(chunk) = rx.recv().await {
writer.write_all(&chunk).await?;
}
writer.shutdown().await.ok();
Ok(())
}

11
common/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "common"
version = "0.1.0"
edition = "2024"
[dependencies]
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
thiserror.workspace = true
uuid.workspace = true

42
common/src/codec.rs Normal file
View File

@@ -0,0 +1,42 @@
use std::io;
use serde::{Serialize, de::DeserializeOwned};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
const MAX_FRAME_SIZE: usize = 1024 * 1024;
pub async fn write_frame<W, T>(writer: &mut W, value: &T) -> io::Result<()>
where
W: AsyncWrite + Unpin,
T: Serialize,
{
let payload = serde_json::to_vec(value)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
if payload.len() > MAX_FRAME_SIZE {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"frame exceeds max size",
));
}
writer.write_u32(payload.len() as u32).await?;
writer.write_all(&payload).await?;
writer.flush().await
}
pub async fn read_frame<R, T>(reader: &mut R) -> io::Result<T>
where
R: AsyncRead + Unpin,
T: DeserializeOwned,
{
let len = reader.read_u32().await? as usize;
if len > MAX_FRAME_SIZE {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"frame too large",
));
}
let mut payload = vec![0u8; len];
reader.read_exact(&mut payload).await?;
serde_json::from_slice(&payload)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
}

3
common/src/lib.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod codec;
pub mod minecraft;
pub mod protocol;

106
common/src/minecraft.rs Normal file
View File

@@ -0,0 +1,106 @@
use std::io;
use tokio::io::{AsyncRead, AsyncReadExt};
pub async fn read_handshake_hostname<R>(reader: &mut R) -> io::Result<String>
where
R: AsyncRead + Unpin,
{
let (hostname, _) = read_handshake_hostname_and_bytes(reader).await?;
Ok(hostname)
}
pub async fn read_handshake_hostname_and_bytes<R>(reader: &mut R) -> io::Result<(String, Vec<u8>)>
where
R: AsyncRead + Unpin,
{
let (packet_len, packet_len_bytes) = read_varint_async_with_bytes(reader).await?;
let packet_len = packet_len as usize;
if packet_len == 0 || packet_len > 2048 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid minecraft handshake packet length",
));
}
let mut buf = vec![0u8; packet_len];
reader.read_exact(&mut buf).await?;
let mut cur = &buf[..];
let packet_id = read_varint_slice(&mut cur)?;
if packet_id != 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"expected handshake packet id 0",
));
}
let _protocol_version = read_varint_slice(&mut cur)?;
let host = read_string_slice(&mut cur, 255)?;
if cur.len() < 2 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "missing port"));
}
let _port = u16::from_be_bytes([cur[0], cur[1]]);
cur = &cur[2..];
let _next_state = read_varint_slice(&mut cur)?;
let mut raw = packet_len_bytes;
raw.extend_from_slice(&buf);
Ok((host, raw))
}
async fn read_varint_async_with_bytes<R>(reader: &mut R) -> io::Result<(i32, Vec<u8>)>
where
R: AsyncRead + Unpin,
{
let mut num_read = 0;
let mut result = 0i32;
let mut raw = Vec::with_capacity(5);
loop {
let byte = reader.read_u8().await?;
raw.push(byte);
let value = (byte & 0x7F) as i32;
result |= value << (7 * num_read);
num_read += 1;
if num_read > 5 {
return Err(io::Error::new(io::ErrorKind::InvalidData, "varint too big"));
}
if (byte & 0x80) == 0 {
break;
}
}
Ok((result, raw))
}
fn read_varint_slice(input: &mut &[u8]) -> io::Result<i32> {
let mut num_read = 0;
let mut result = 0i32;
loop {
if input.is_empty() {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "eof"));
}
let byte = input[0];
*input = &input[1..];
let value = (byte & 0x7F) as i32;
result |= value << (7 * num_read);
num_read += 1;
if num_read > 5 {
return Err(io::Error::new(io::ErrorKind::InvalidData, "varint too big"));
}
if (byte & 0x80) == 0 {
break;
}
}
Ok(result)
}
fn read_string_slice(input: &mut &[u8], max_len: usize) -> io::Result<String> {
let len = read_varint_slice(input)? as usize;
if len > max_len || input.len() < len {
return Err(io::Error::new(io::ErrorKind::InvalidData, "bad string len"));
}
let bytes = &input[..len];
*input = &input[len..];
String::from_utf8(bytes.to_vec())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
}

69
common/src/protocol.rs Normal file
View File

@@ -0,0 +1,69 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegisterRequest {
pub token: String,
pub region: String,
pub requested_subdomain: Option<String>,
pub local_addr: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegisterAccepted {
pub session_id: String,
pub fqdn: String,
pub heartbeat_interval_secs: u64,
pub owner_instance_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Heartbeat {
pub session_id: String,
pub active_streams: u32,
pub bytes_in: u64,
pub bytes_out: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IncomingTcp {
pub stream_id: String,
pub session_id: String,
pub peer_addr: String,
pub hostname: String,
pub initial_data: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamData {
pub stream_id: String,
pub data: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamClosed {
pub stream_id: String,
pub reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum ClientFrame {
Register(RegisterRequest),
Heartbeat(Heartbeat),
StreamData(StreamData),
StreamClosed(StreamClosed),
Pong,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum ServerFrame {
RegisterAccepted(RegisterAccepted),
RegisterRejected { reason: String },
IncomingTcp(IncomingTcp),
StreamData(StreamData),
StreamClosed(StreamClosed),
Ping,
DrainNotice { retry_after_ms: u64, reason: String },
Error { message: String },
}

67
db/schema.sql Normal file
View File

@@ -0,0 +1,67 @@
-- PostgreSQL schema scaffold for dvv tunnel SaaS platform.
create extension if not exists pgcrypto;
create table if not exists users (
id uuid primary key default gen_random_uuid(),
email text unique not null,
password_hash text,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create table if not exists plans (
id text primary key,
name text not null,
max_tunnels integer not null,
custom_subdomain boolean not null default false,
idle_timeout_seconds integer,
bandwidth_cap_kbps integer,
priority_routing boolean not null default false,
created_at timestamptz not null default now()
);
create table if not exists subscriptions (
id uuid primary key default gen_random_uuid(),
user_id uuid not null references users(id) on delete cascade,
plan_id text not null references plans(id),
provider text not null default 'stripe',
provider_customer_id text,
provider_subscription_id text,
status text not null,
current_period_end timestamptz,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists subscriptions_user_id_idx on subscriptions(user_id);
create table if not exists tunnels (
id uuid primary key default gen_random_uuid(),
user_id uuid not null references users(id) on delete cascade,
region text not null check (region in ('eu', 'us', 'asia')),
subdomain text not null,
custom_domain boolean not null default false,
active boolean not null default true,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
unique (region, subdomain)
);
create index if not exists tunnels_user_id_idx on tunnels(user_id);
create table if not exists usage_rollups_hourly (
user_id uuid not null references users(id) on delete cascade,
hour_bucket timestamptz not null,
bytes_in bigint not null default 0,
bytes_out bigint not null default 0,
player_connections integer not null default 0,
primary key (user_id, hour_bucket)
);
insert into plans (id, name, max_tunnels, custom_subdomain, idle_timeout_seconds, bandwidth_cap_kbps, priority_routing)
values
('free', 'Free', 1, false, 900, 512, false),
('pro', 'Pro', 5, true, null, 8192, true),
('team', 'Team', 25, true, null, null, true)
on conflict (id) do nothing;

32
docker-compose.yml Normal file
View File

@@ -0,0 +1,32 @@
version: "3.9"
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: dvv
POSTGRES_PASSWORD: dvv
POSTGRES_DB: dvv
ports:
- "5432:5432"
volumes:
- ./db/schema.sql:/docker-entrypoint-initdb.d/001-schema.sql:ro
auth-api:
build:
context: .
dockerfile: Dockerfile.auth-api
environment:
AUTH_BIND: 0.0.0.0:8080
JWT_SECRET: dev-secret-change-me
REDIS_URL: redis://redis:6379
depends_on:
- redis
ports:
- "8080:8080"

16
relay/Cargo.toml Normal file
View File

@@ -0,0 +1,16 @@
[package]
name = "relay"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
uuid.workspace = true
fastrand.workspace = true
redis.workspace = true
serde_json.workspace = true
chrono.workspace = true
common = { path = "../common" }

699
relay/src/main.rs Normal file
View File

@@ -0,0 +1,699 @@
use std::{
collections::HashMap,
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};
use anyhow::{Context, Result};
use common::{
codec::{read_frame, write_frame},
minecraft::read_handshake_hostname_and_bytes,
protocol::{
ClientFrame, Heartbeat, IncomingTcp, RegisterAccepted, RegisterRequest, ServerFrame,
StreamClosed, StreamData,
},
};
use redis::AsyncCommands;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
sync::{Notify, RwLock, mpsc},
time::{MissedTickBehavior, interval, timeout},
};
use tracing::{debug, info, warn};
use uuid::Uuid;
#[derive(Clone)]
struct RelayConfig {
instance_id: String,
region: String,
control_bind: String,
player_bind: String,
domain: String,
heartbeat_timeout: Duration,
registry_ttl_secs: u64,
}
impl RelayConfig {
fn from_env() -> Self {
Self {
instance_id: std::env::var("RELAY_INSTANCE_ID")
.unwrap_or_else(|_| format!("relay-{}", Uuid::new_v4())),
region: std::env::var("RELAY_REGION").unwrap_or_else(|_| "eu".to_string()),
control_bind: std::env::var("RELAY_CONTROL_BIND")
.unwrap_or_else(|_| "0.0.0.0:7000".to_string()),
player_bind: std::env::var("RELAY_PLAYER_BIND")
.unwrap_or_else(|_| "0.0.0.0:25565".to_string()),
domain: std::env::var("RELAY_BASE_DOMAIN").unwrap_or_else(|_| "dvv.one".to_string()),
heartbeat_timeout: Duration::from_secs(
std::env::var("RELAY_HEARTBEAT_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(30),
),
registry_ttl_secs: std::env::var("RELAY_REGISTRY_TTL_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(20),
}
}
}
#[derive(Clone)]
struct SessionHandle {
session_id: String,
tx: mpsc::Sender<ServerFrame>,
stream_sinks: Arc<RwLock<HashMap<String, mpsc::Sender<Vec<u8>>>>>,
last_heartbeat: Instant,
}
struct RelayState {
by_fqdn: HashMap<String, SessionHandle>,
by_session: HashMap<String, String>,
}
impl RelayState {
fn new() -> Self {
Self {
by_fqdn: HashMap::new(),
by_session: HashMap::new(),
}
}
fn session_count(&self) -> usize {
self.by_session.len()
}
}
type SharedState = Arc<RwLock<RelayState>>;
#[derive(Clone)]
struct RedisRegistry {
conn: Option<redis::aio::ConnectionManager>,
instance_id: String,
region: String,
control_addr: String,
player_addr: String,
ttl_secs: u64,
}
impl RedisRegistry {
async fn from_env(cfg: &RelayConfig) -> Self {
let conn = match std::env::var("REDIS_URL") {
Ok(url) => match redis::Client::open(url.clone()) {
Ok(client) => match redis::aio::ConnectionManager::new(client).await {
Ok(cm) => {
info!("connected to redis");
Some(cm)
}
Err(e) => {
warn!(error = %e, "redis connection manager failed; continuing without redis");
None
}
},
Err(e) => {
warn!(error = %e, "invalid REDIS_URL; continuing without redis");
None
}
},
Err(_) => None,
};
Self {
conn,
instance_id: cfg.instance_id.clone(),
region: cfg.region.clone(),
control_addr: cfg.control_bind.clone(),
player_addr: cfg.player_bind.clone(),
ttl_secs: cfg.registry_ttl_secs,
}
}
async fn register_instance(&self) {
let Some(mut conn) = self.conn.clone() else {
return;
};
let key = format!("relay:instance:{}", self.instance_id);
let payload = serde_json::json!({
"instance_id": self.instance_id,
"region": self.region,
"status": "active",
"control_addr": self.control_addr,
"player_addr": self.player_addr,
"started_at": chrono::Utc::now().timestamp(),
})
.to_string();
let res: redis::RedisResult<()> = async {
let _: usize = conn.sadd("relay:instances", &self.instance_id).await?;
let _: () = conn.set_ex(&key, payload, self.ttl_secs).await?;
let hb_key = format!("relay:heartbeat:{}", self.instance_id);
let _: () = conn.set_ex(hb_key, "1", self.ttl_secs).await?;
Ok(())
}
.await;
if let Err(e) = res {
warn!(error = %e, "failed to register instance in redis");
}
}
async fn heartbeat_instance(&self, tunnel_count: usize) {
let Some(mut conn) = self.conn.clone() else {
return;
};
let hb_key = format!("relay:heartbeat:{}", self.instance_id);
let load_key = format!("relay:load:{}", self.region);
let score = tunnel_count as f64;
let key = format!("relay:instance:{}", self.instance_id);
let payload = serde_json::json!({
"instance_id": self.instance_id,
"region": self.region,
"status": "active",
"control_addr": self.control_addr,
"player_addr": self.player_addr,
"tunnel_count": tunnel_count,
"updated_at": chrono::Utc::now().timestamp(),
})
.to_string();
let res: redis::RedisResult<()> = async {
let _: () = conn.set_ex(hb_key, "1", self.ttl_secs).await?;
let _: () = conn.set_ex(key, payload, self.ttl_secs).await?;
let _: () = conn.zadd(load_key, &self.instance_id, score).await?;
Ok(())
}
.await;
if let Err(e) = res {
warn!(error = %e, "redis instance heartbeat failed");
}
}
async fn set_draining(&self) {
let Some(mut conn) = self.conn.clone() else {
return;
};
let key = format!("relay:instance:{}", self.instance_id);
let payload = serde_json::json!({
"instance_id": self.instance_id,
"region": self.region,
"status": "draining",
"control_addr": self.control_addr,
"player_addr": self.player_addr,
"updated_at": chrono::Utc::now().timestamp(),
})
.to_string();
let _: redis::RedisResult<()> = async {
let _: () = conn.set_ex(key, payload, self.ttl_secs).await?;
let load_key = format!("relay:load:{}", self.region);
let _: () = conn.zadd(load_key, &self.instance_id, 1e12f64).await?;
Ok(())
}
.await;
}
async fn register_tunnel(&self, fqdn: &str, session_id: &str, user_id: &str) {
let Some(mut conn) = self.conn.clone() else {
return;
};
let key = format!("tunnel:sub:{fqdn}");
let session_key = format!("tunnel:session:{session_id}");
let payload = serde_json::json!({
"instance_id": self.instance_id,
"session_id": session_id,
"user_id": user_id,
"region": self.region,
"fqdn": fqdn,
})
.to_string();
let _: redis::RedisResult<()> = async {
let _: () = conn.set_ex(key, &payload, self.ttl_secs).await?;
let _: () = conn.set_ex(session_key, payload, self.ttl_secs).await?;
Ok(())
}
.await;
}
async fn refresh_tunnel_session(&self, fqdn: &str, session_id: &str, user_id: &str) {
self.register_tunnel(fqdn, session_id, user_id).await;
}
async fn remove_tunnel(&self, fqdn: &str, session_id: &str) {
let Some(mut conn) = self.conn.clone() else {
return;
};
let _: redis::RedisResult<()> = async {
let _: usize = conn.del(format!("tunnel:sub:{fqdn}")).await?;
let _: usize = conn.del(format!("tunnel:session:{session_id}")).await?;
Ok(())
}
.await;
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "relay=info".into()),
)
.init();
let cfg = RelayConfig::from_env();
let registry = RedisRegistry::from_env(&cfg).await;
registry.register_instance().await;
let control_listener = TcpListener::bind(&cfg.control_bind)
.await
.with_context(|| format!("bind control {}", cfg.control_bind))?;
let player_listener = TcpListener::bind(&cfg.player_bind)
.await
.with_context(|| format!("bind player {}", cfg.player_bind))?;
info!(instance_id = %cfg.instance_id, region = %cfg.region, "relay started");
let shutdown = Arc::new(Notify::new());
let state: SharedState = Arc::new(RwLock::new(RelayState::new()));
let heartbeat_task = tokio::spawn(run_registry_heartbeat(
state.clone(),
registry.clone(),
shutdown.clone(),
));
let control_task = tokio::spawn(run_control_accept_loop(
control_listener,
cfg.clone(),
state.clone(),
registry.clone(),
shutdown.clone(),
));
let player_task = tokio::spawn(run_player_accept_loop(
player_listener,
state.clone(),
shutdown.clone(),
));
tokio::pin!(heartbeat_task);
tokio::pin!(control_task);
tokio::pin!(player_task);
tokio::select! {
_ = tokio::signal::ctrl_c() => info!("shutdown signal received"),
res = &mut control_task => warn!("control accept loop ended: {:?}", res),
res = &mut player_task => warn!("player accept loop ended: {:?}", res),
res = &mut heartbeat_task => warn!("registry heartbeat task ended: {:?}", res),
}
registry.set_draining().await;
shutdown.notify_waiters();
info!("draining relay");
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(())
}
async fn run_registry_heartbeat(state: SharedState, registry: RedisRegistry, shutdown: Arc<Notify>) {
let mut ticker = interval(Duration::from_secs(5));
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = shutdown.notified() => break,
_ = ticker.tick() => {
let count = state.read().await.session_count();
registry.heartbeat_instance(count).await;
}
}
}
}
async fn run_control_accept_loop(
listener: TcpListener,
cfg: RelayConfig,
state: SharedState,
registry: RedisRegistry,
shutdown: Arc<Notify>,
) -> Result<()> {
loop {
tokio::select! {
_ = shutdown.notified() => break,
res = listener.accept() => {
let (stream, addr) = match res {
Ok(v) => v,
Err(e) => { warn!(error = %e, "control accept failed"); continue; }
};
let cfg = cfg.clone();
let state = state.clone();
let registry = registry.clone();
tokio::spawn(async move {
if let Err(e) = handle_control_conn(stream, addr, cfg, state, registry).await {
warn!(peer = %addr, error = %e, "control connection ended with error");
}
});
}
}
}
Ok(())
}
async fn run_player_accept_loop(
listener: TcpListener,
state: SharedState,
shutdown: Arc<Notify>,
) -> Result<()> {
loop {
tokio::select! {
_ = shutdown.notified() => break,
res = listener.accept() => {
let (stream, addr) = match res {
Ok(v) => v,
Err(e) => { warn!(error = %e, "player accept failed"); continue; }
};
let state = state.clone();
tokio::spawn(async move {
if let Err(e) = handle_player_conn(stream, addr, state).await {
debug!(peer = %addr, error = %e, "player connection closed");
}
});
}
}
}
Ok(())
}
async fn handle_control_conn(
stream: TcpStream,
addr: SocketAddr,
cfg: RelayConfig,
state: SharedState,
registry: RedisRegistry,
) -> Result<()> {
let (mut reader, mut writer) = stream.into_split();
let first: ClientFrame = read_frame(&mut reader).await.context("read initial frame")?;
let register = match first {
ClientFrame::Register(req) => req,
_ => {
write_frame(
&mut writer,
&ServerFrame::RegisterRejected { reason: "expected Register frame".to_string() },
).await.ok();
anyhow::bail!("expected register frame");
}
};
if !token_looks_valid(&register.token) {
write_frame(
&mut writer,
&ServerFrame::RegisterRejected { reason: "invalid token".to_string() },
).await.ok();
anyhow::bail!("invalid token");
}
let (tx, mut rx) = mpsc::channel::<ServerFrame>(512);
let session_id = Uuid::new_v4().to_string();
let fqdn = assign_fqdn(&cfg, &register);
let user_id = fake_user_id_from_token(&register.token);
let stream_sinks = Arc::new(RwLock::new(HashMap::<String, mpsc::Sender<Vec<u8>>>::new()));
{
let mut guard = state.write().await;
let handle = SessionHandle {
session_id: session_id.clone(),
tx: tx.clone(),
stream_sinks: stream_sinks.clone(),
last_heartbeat: Instant::now(),
};
guard.by_session.insert(session_id.clone(), fqdn.clone());
guard.by_fqdn.insert(fqdn.clone(), handle);
}
registry.register_tunnel(&fqdn, &session_id, &user_id).await;
write_frame(
&mut writer,
&ServerFrame::RegisterAccepted(RegisterAccepted {
session_id: session_id.clone(),
fqdn: fqdn.clone(),
heartbeat_interval_secs: 5,
owner_instance_id: cfg.instance_id.clone(),
}),
)
.await?;
info!(peer = %addr, user_id = %user_id, fqdn = %fqdn, session_id = %session_id, "client registered");
let write_task = tokio::spawn(async move {
while let Some(frame) = rx.recv().await {
write_frame(&mut writer, &frame).await?;
}
Ok::<(), anyhow::Error>(())
});
let read_result = control_read_loop(
&mut reader,
&state,
&registry,
&session_id,
&fqdn,
&user_id,
cfg.heartbeat_timeout,
)
.await;
if let Err(e) = &read_result {
warn!(session_id = %session_id, error = %e, "control read loop error");
}
{
let mut guard = state.write().await;
if let Some(fqdn) = guard.by_session.remove(&session_id) {
guard.by_fqdn.remove(&fqdn);
}
}
registry.remove_tunnel(&fqdn, &session_id).await;
write_task.abort();
info!(session_id = %session_id, "client session removed");
read_result
}
async fn control_read_loop(
reader: &mut tokio::net::tcp::OwnedReadHalf,
state: &SharedState,
registry: &RedisRegistry,
session_id: &str,
fqdn: &str,
user_id: &str,
heartbeat_timeout: Duration,
) -> Result<()> {
loop {
let frame: ClientFrame = timeout(heartbeat_timeout, read_frame(reader))
.await
.context("heartbeat timeout")??;
match frame {
ClientFrame::Heartbeat(Heartbeat { session_id: hb_id, .. }) => {
if hb_id != session_id {
anyhow::bail!("heartbeat session mismatch");
}
let mut guard = state.write().await;
if let Some(route_fqdn) = guard.by_session.get(session_id).cloned()
&& let Some(handle) = guard.by_fqdn.get_mut(&route_fqdn)
{
handle.last_heartbeat = Instant::now();
}
drop(guard);
registry.refresh_tunnel_session(fqdn, session_id, user_id).await;
}
ClientFrame::StreamData(StreamData { stream_id, data }) => {
let sink = lookup_stream_sink(state, session_id, &stream_id).await;
if let Some(tx) = sink {
if tx.send(data).await.is_err() {
remove_stream_sink(state, session_id, &stream_id).await;
}
}
}
ClientFrame::StreamClosed(StreamClosed { stream_id, .. }) => {
remove_stream_sink(state, session_id, &stream_id).await;
}
ClientFrame::Pong => {}
ClientFrame::Register(_) => anyhow::bail!("unexpected Register frame after registration"),
}
}
}
async fn handle_player_conn(
mut stream: TcpStream,
addr: SocketAddr,
state: SharedState,
) -> Result<()> {
let (hostname, initial_data) = read_handshake_hostname_and_bytes(&mut stream)
.await
.context("parse minecraft handshake")?;
let session = {
let guard = state.read().await;
guard.by_fqdn.get(&hostname).cloned()
};
let Some(session) = session else {
debug!(peer = %addr, hostname = %hostname, "no tunnel for hostname");
return Ok(());
};
let stream_id = Uuid::new_v4().to_string();
let (player_read, player_write) = stream.into_split();
let (to_player_tx, to_player_rx) = mpsc::channel::<Vec<u8>>(128);
session
.stream_sinks
.write()
.await
.insert(stream_id.clone(), to_player_tx);
session
.tx
.send(ServerFrame::IncomingTcp(IncomingTcp {
stream_id: stream_id.clone(),
session_id: session.session_id.clone(),
peer_addr: addr.to_string(),
hostname: hostname.clone(),
initial_data,
}))
.await
.context("send IncomingTcp to client")?;
let tx_control = session.tx.clone();
let stream_id_clone = stream_id.clone();
let session_id_clone = session.session_id.clone();
let sinks = session.stream_sinks.clone();
tokio::spawn(async move {
if let Err(e) = run_player_writer(player_write, to_player_rx).await {
debug!(stream_id = %stream_id_clone, error = %e, "player writer ended");
}
let _ = tx_control
.send(ServerFrame::StreamClosed(StreamClosed {
stream_id: stream_id_clone.clone(),
reason: Some("player_writer_closed".into()),
}))
.await;
let _ = remove_stream_sink_by_store(sinks, &stream_id_clone).await;
let _ = session_id_clone;
});
let tx_control = session.tx.clone();
let stream_id_clone = stream_id.clone();
let sinks = session.stream_sinks.clone();
tokio::spawn(async move {
if let Err(e) = run_player_reader(player_read, tx_control.clone(), stream_id_clone.clone()).await {
debug!(stream_id = %stream_id_clone, error = %e, "player reader ended");
}
let _ = tx_control
.send(ServerFrame::StreamClosed(StreamClosed {
stream_id: stream_id_clone.clone(),
reason: Some("player_reader_closed".into()),
}))
.await;
let _ = remove_stream_sink_by_store(sinks, &stream_id_clone).await;
});
info!(peer = %addr, hostname = %hostname, session_id = %session.session_id, stream_id = %stream_id, "player proxied via client stream");
Ok(())
}
async fn run_player_reader(
mut reader: tokio::net::tcp::OwnedReadHalf,
tx_control: mpsc::Sender<ServerFrame>,
stream_id: String,
) -> Result<()> {
let mut buf = vec![0u8; 16 * 1024];
loop {
let n = reader.read(&mut buf).await?;
if n == 0 {
break;
}
tx_control
.send(ServerFrame::StreamData(StreamData {
stream_id: stream_id.clone(),
data: buf[..n].to_vec(),
}))
.await
.context("send stream data to client")?;
}
Ok(())
}
async fn run_player_writer(
mut writer: tokio::net::tcp::OwnedWriteHalf,
mut rx: mpsc::Receiver<Vec<u8>>,
) -> Result<()> {
while let Some(chunk) = rx.recv().await {
writer.write_all(&chunk).await?;
}
writer.shutdown().await.ok();
Ok(())
}
async fn lookup_stream_sink(
state: &SharedState,
session_id: &str,
stream_id: &str,
) -> Option<mpsc::Sender<Vec<u8>>> {
let store = {
let guard = state.read().await;
let fqdn = guard.by_session.get(session_id)?.clone();
guard.by_fqdn.get(&fqdn)?.stream_sinks.clone()
};
store.read().await.get(stream_id).cloned()
}
async fn remove_stream_sink(state: &SharedState, session_id: &str, stream_id: &str) {
let store = {
let guard = state.read().await;
let Some(fqdn) = guard.by_session.get(session_id).cloned() else {
return;
};
let Some(handle) = guard.by_fqdn.get(&fqdn) else {
return;
};
handle.stream_sinks.clone()
};
let _ = remove_stream_sink_by_store(store, stream_id).await;
}
async fn remove_stream_sink_by_store(
store: Arc<RwLock<HashMap<String, mpsc::Sender<Vec<u8>>>>>,
stream_id: &str,
) -> Option<mpsc::Sender<Vec<u8>>> {
store.write().await.remove(stream_id)
}
fn token_looks_valid(token: &str) -> bool {
!token.trim().is_empty()
}
fn fake_user_id_from_token(token: &str) -> String {
let suffix: String = token.chars().rev().take(6).collect();
format!("user-{}", suffix.chars().rev().collect::<String>())
}
fn assign_fqdn(cfg: &RelayConfig, req: &RegisterRequest) -> String {
let label = req
.requested_subdomain
.as_ref()
.filter(|s| !s.trim().is_empty())
.cloned()
.unwrap_or_else(random_label);
format!("{}.{}.{}", sanitize_label(&label), cfg.region, cfg.domain)
}
fn random_label() -> String {
const ADJ: &[&str] = &["sleepy", "swift", "brave", "quiet", "mossy"];
const NOUN: &[&str] = &["creeper", "ghast", "axolotl", "wolf", "beacon"];
format!(
"{}-{}-{}",
ADJ[fastrand::usize(..ADJ.len())],
NOUN[fastrand::usize(..NOUN.len())],
fastrand::u16(..9999)
)
}
fn sanitize_label(input: &str) -> String {
input
.chars()
.filter(|c| c.is_ascii_alphanumeric() || *c == '-')
.collect::<String>()
.trim_matches('-')
.to_ascii_lowercase()
}