Compare commits

..

3 Commits

Author SHA1 Message Date
L
09205f8db2 chore: update Rust dependencies. 2026-02-23 23:26:57 +00:00
L
e7ef7fdf70 feat: add replica-aware relay-to-relay forwarding 2026-02-23 23:23:22 +00:00
L
050dbc792a feat: scaffold relay client auth workspace 2026-02-23 23:18:56 +00:00
18 changed files with 4102 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
target/
.DS_Store
.env
.env.*
!.env.example

1799
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

21
Cargo.toml Normal file
View File

@@ -0,0 +1,21 @@
[workspace]
members = ["common", "relay", "client", "auth-api"]
resolver = "2"
[workspace.package]
edition = "2024"
[workspace.dependencies]
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.44", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
uuid = { version = "1.16", features = ["v4", "serde"] }
thiserror = "2.0"
fastrand = "2.3"
axum = "0.8"
redis = { version = "0.32", features = ["tokio-comp", "connection-manager"] }
jsonwebtoken = "10"
chrono = { version = "0.4", features = ["serde", "clock"] }

12
Dockerfile.auth-api Normal file
View File

@@ -0,0 +1,12 @@
FROM rust:1.86 as build
WORKDIR /app
COPY . .
RUN cargo build --release -p auth-api
FROM debian:bookworm-slim
RUN useradd -r -u 10001 appuser
WORKDIR /app
COPY --from=build /app/target/release/auth-api /usr/local/bin/auth-api
USER appuser
EXPOSE 8080
CMD ["auth-api"]

311
MASTER_PROMPT.md Normal file
View File

@@ -0,0 +1,311 @@
# 🧠 MASTER SYSTEM PROMPT
## Distributed Minecraft Tunnel Platform (Railway-Scalable, Replica-Aware, Monetizable)
---
## ROLE
You are a senior distributed systems architect and Rust network engineer.
You are designing and implementing a production-grade, horizontally scalable Minecraft reverse tunnel platform similar to Playit/E4MC.
The system must:
* Scale from 1k → 50k+ concurrent tunnels
* Support multi-region
* Support Railway auto-scaling
* Be replica-safe
* Be stateless at relay layer
* Be monetization-ready
* Be resilient to instance restarts
* Maintain low latency
* Be designed for high concurrency
This is not a hobby project.
Design for real-world production constraints.
---
# 🎯 SYSTEM OBJECTIVE
Users run a Rust client locally.
They receive a subdomain:
```
sleepy-creeper.eu.dvv.one
```
Players connect to that subdomain.
Traffic is routed through relay nodes to the users home server.
No port forwarding required.
---
# 🏗 GLOBAL ARCHITECTURE REQUIREMENTS
You must design:
1. Rust Relay Service (stateless)
2. Rust Client Agent
3. Redis cluster (shared tunnel state)
4. PostgreSQL (users + billing)
5. Auth API (minimal Rust HTTP service)
6. Stripe billing integration
7. Railway deployment topology
8. Multi-region support
9. Replica-aware routing
10. Graceful autoscaling logic
---
# 🌍 MULTI-REGION MODEL
Regions:
```
eu
us
asia (optional)
```
Each region:
* Has its own relay cluster
* Has its own Railway service group
* Shares central Redis + Postgres (or region-local Redis if specified)
Subdomain format:
```
adjective-noun.{region}.dvv.one
```
---
# 🧱 RELAY SERVICE REQUIREMENTS (RUST)
Relay must:
* Listen on:
* 7000 (tunnel registration)
* 25565 (Minecraft routing)
* Be async (Tokio)
* Be horizontally scalable
* NOT store tunnel state locally
---
## 🔁 TUNNEL STATE MODEL (REDIS)
Redis stores:
```
subdomain → instance_id
instance_id → active tunnel metadata
user_id → plan tier
```
Relay instances must:
* Register themselves in Redis on startup
* Maintain heartbeat key:
```
instance:{id} → alive
```
* Remove stale instance data on startup
---
## 🔄 REPLICA SUPPORT
If multiple relay replicas are running:
* Any relay can receive player connection
* It checks Redis:
```
subdomain → owning instance
```
If tunnel owned by different instance:
* Open internal TCP connection to that instance
* Forward traffic internally
* Do NOT reject connection
Cross-instance routing must:
* Be efficient
* Add minimal latency
* Avoid infinite routing loops
* Avoid recursive forwarding
---
# 🚂 RAILWAY COMPATIBILITY REQUIREMENTS
You MUST design for:
* Ephemeral containers
* No persistent disk
* Dynamic scaling up/down
* Instance restarts at any time
* Non-static internal IPs
Therefore:
* All state in Redis
* No reliance on local memory persistence
* Graceful shutdown support (SIGTERM handler)
* Drain active tunnels before shutdown
---
# 📈 AUTOSCALING LOGIC
When scaling up:
* New instances register in Redis
* New tunnel registrations assigned to least-loaded instance
* Use Redis sorted set for load tracking
When scaling down:
* Instance marks itself as “draining”
* Stops accepting new tunnels
* Waits until active tunnels reach 0
* Exits cleanly
---
# 🧠 CLIENT REQUIREMENTS (RUST)
Client must:
* Benchmark latency to all regions
* Connect to lowest-latency region
* Authenticate using token
* Maintain persistent tunnel
* Reconnect automatically
* Handle relay instance migration
* Support multiple concurrent forwarded connections
* Be under 10MB binary
---
# 💰 SAAS & MONETIZATION LAYER
Free tier:
* 1 tunnel
* Random subdomain
* Idle timeout
* Limited bandwidth
Paid tier:
* Custom subdomain
* Multiple tunnels
* Higher bandwidth cap
* No idle timeout
* Priority routing
---
## 🔐 AUTH MODEL
* JWT tokens
* Client must send token during registration
* Relay verifies token via Redis cache
* Plan limits enforced in real time
---
# 🧮 SCALING TARGETS
Design must handle:
| Level | Active Tunnels | Player Connections |
| ------ | -------------- | ------------------ |
| Small | 1,000 | 5,000 |
| Medium | 10,000 | 50,000 |
| Large | 50,000 | 200,000 |
Include:
* Memory usage estimates
* CPU usage estimates
* Redis throughput estimates
* Bandwidth estimates
---
# ⚡ PERFORMANCE CONSTRAINTS
* No blocking I/O
* No global Mutex<HashMap>
* No heavy frameworks in relay
* Zero-copy forwarding
* Use `tokio::io::copy_bidirectional`
* Minimal allocations in hot path
* Avoid logging per packet
---
# 🛡 FAILURE SCENARIOS TO HANDLE
* Relay instance crash
* Redis temporary outage
* Client reconnect storm
* Player connects during migration
* Instance scaling down mid-traffic
* Partial network partition
Explain how each is handled safely.
---
# 📦 REQUIRED OUTPUT FORMAT
Return:
1. Full system architecture diagram (text-based)
2. Component descriptions
3. Redis schema
4. Cross-instance routing design
5. Autoscaling algorithm
6. Graceful shutdown flow
7. Client architecture
8. SaaS enforcement model
9. Railway deployment layout
10. Cost estimates at 1k / 10k / 50k scale
11. Bottleneck analysis
12. Optimization opportunities
Do NOT provide vague explanations.
Be concrete, structured, and technical.
---
# 🔥 ADVANCED MODE (MANDATORY)
Include:
* Replica-aware routing logic
* Load-based tunnel assignment
* Rate limiting strategy
* DDoS mitigation outline
* Observability plan (metrics + tracing)
* Suggested Prometheus metrics
* Suggested horizontal scaling thresholds
---
# 🚀 DESIGN PRIORITY ORDER
1. Correctness
2. Scalability
3. Low latency
4. Railway compatibility
5. Monetization enforcement
6. Cost efficiency
---
This single prompt forces an AI agent to produce a **real distributed system blueprint**, not toy code.
---
If you want next, I can:
* Design the *ultra-low-latency no-Redis single-region version*
* Or design the *global Anycast + BGP version like major networks*
* Or help you decide whether Railway is actually the right choice for 50k tunnels*
Youre now at infrastructure-architect level thinking.

17
auth-api/Cargo.toml Normal file
View File

@@ -0,0 +1,17 @@
[package]
name = "auth-api"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow.workspace = true
axum.workspace = true
chrono.workspace = true
jsonwebtoken.workspace = true
redis.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
fastrand.workspace = true

250
auth-api/src/main.rs Normal file
View File

@@ -0,0 +1,250 @@
use std::{net::SocketAddr, sync::Arc};
use anyhow::{Context, Result};
use axum::{
Json, Router,
extract::State,
http::StatusCode,
response::IntoResponse,
routing::{get, post},
};
use chrono::{Duration, Utc};
use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, Validation, decode, encode};
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
#[derive(Clone)]
struct AppState {
jwt_secret: Arc<String>,
redis: Option<redis::aio::ConnectionManager>,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Claims {
sub: String,
tier: String,
max_tunnels: u32,
exp: usize,
iat: usize,
jti: String,
}
#[derive(Debug, Deserialize)]
struct DevTokenRequest {
user_id: String,
#[serde(default = "default_tier")]
tier: String,
#[serde(default = "default_max_tunnels")]
max_tunnels: u32,
}
#[derive(Debug, Serialize)]
struct TokenResponse {
token: String,
expires_at: i64,
}
#[derive(Debug, Deserialize)]
struct ValidateRequest {
token: String,
}
#[derive(Debug, Serialize)]
struct ValidateResponse {
valid: bool,
user_id: Option<String>,
tier: Option<String>,
max_tunnels: Option<u32>,
}
#[derive(Debug, Deserialize)]
struct StripeWebhookEvent {
event_type: String,
user_id: String,
tier: String,
#[serde(default = "default_max_tunnels")]
max_tunnels: u32,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "auth_api=info".into()),
)
.init();
let bind = std::env::var("AUTH_BIND").unwrap_or_else(|_| "0.0.0.0:8080".into());
let jwt_secret = std::env::var("JWT_SECRET").unwrap_or_else(|_| "dev-secret-change-me".into());
let redis = if let Ok(url) = std::env::var("REDIS_URL") {
let client = redis::Client::open(url.clone()).context("open redis client")?;
match redis::aio::ConnectionManager::new(client).await {
Ok(conn) => {
info!("auth-api connected to redis");
Some(conn)
}
Err(e) => {
warn!(error = %e, "auth-api redis unavailable; continuing without cache");
None
}
}
} else {
None
};
let state = AppState {
jwt_secret: Arc::new(jwt_secret),
redis,
};
let app = Router::new()
.route("/healthz", get(healthz))
.route("/v1/token/dev", post(issue_dev_token))
.route("/v1/token/validate", post(validate_token))
.route("/v1/stripe/webhook", post(stripe_webhook))
.with_state(state);
let listener = tokio::net::TcpListener::bind(&bind)
.await
.with_context(|| format!("bind {bind}"))?;
let local_addr: SocketAddr = listener.local_addr()?;
info!(addr = %local_addr, "auth-api listening");
axum::serve(listener, app).await?;
Ok(())
}
async fn healthz() -> &'static str {
"ok"
}
async fn issue_dev_token(
State(state): State<AppState>,
Json(req): Json<DevTokenRequest>,
) -> Result<Json<TokenResponse>, ApiError> {
let now = Utc::now();
let exp = now + Duration::hours(24);
let claims = Claims {
sub: req.user_id,
tier: req.tier,
max_tunnels: req.max_tunnels,
exp: exp.timestamp() as usize,
iat: now.timestamp() as usize,
jti: format!("jti-{}", fastrand::u64(..)),
};
let token = encode(
&Header::default(),
&claims,
&EncodingKey::from_secret(state.jwt_secret.as_bytes()),
)
.map_err(ApiError::internal)?;
if let Some(mut redis) = state.redis.clone() {
let key = format!("auth:jwt:jti:{}", claims.jti);
let ttl = (claims.exp as i64 - now.timestamp()).max(1);
let payload = serde_json::json!({
"user_id": claims.sub,
"plan_tier": claims.tier,
"max_tunnels": claims.max_tunnels
})
.to_string();
let _: () = redis
.set_ex(key, payload, ttl as u64)
.await
.map_err(ApiError::internal)?;
}
Ok(Json(TokenResponse {
token,
expires_at: exp.timestamp(),
}))
}
async fn validate_token(
State(state): State<AppState>,
Json(req): Json<ValidateRequest>,
) -> Result<Json<ValidateResponse>, ApiError> {
let decoded = decode::<Claims>(
&req.token,
&DecodingKey::from_secret(state.jwt_secret.as_bytes()),
&Validation::new(Algorithm::HS256),
);
match decoded {
Ok(tok) => {
let c = tok.claims;
if let Some(mut redis) = state.redis.clone() {
let key = format!("plan:user:{}", c.sub);
let payload = serde_json::json!({
"tier": c.tier,
"max_tunnels": c.max_tunnels,
"source": "auth-api"
})
.to_string();
let _: () = redis.set_ex(key, payload, 300).await.map_err(ApiError::internal)?;
}
Ok(Json(ValidateResponse {
valid: true,
user_id: Some(c.sub),
tier: Some(c.tier),
max_tunnels: Some(c.max_tunnels),
}))
}
Err(_) => Ok(Json(ValidateResponse {
valid: false,
user_id: None,
tier: None,
max_tunnels: None,
})),
}
}
async fn stripe_webhook(
State(state): State<AppState>,
Json(event): Json<StripeWebhookEvent>,
) -> Result<impl IntoResponse, ApiError> {
if let Some(mut redis) = state.redis.clone() {
let key = format!("plan:user:{}", event.user_id);
let payload = serde_json::json!({
"tier": event.tier,
"max_tunnels": event.max_tunnels,
"source": "stripe_webhook",
"last_event_type": event.event_type,
"updated_at": Utc::now().timestamp(),
})
.to_string();
let _: () = redis.set_ex(key, payload, 300).await.map_err(ApiError::internal)?;
}
Ok(StatusCode::NO_CONTENT)
}
#[derive(Debug)]
struct ApiError {
status: StatusCode,
message: String,
}
impl ApiError {
fn internal<E: std::fmt::Display>(e: E) -> Self {
Self {
status: StatusCode::INTERNAL_SERVER_ERROR,
message: e.to_string(),
}
}
}
impl IntoResponse for ApiError {
fn into_response(self) -> axum::response::Response {
(self.status, self.message).into_response()
}
}
fn default_tier() -> String {
"free".to_string()
}
fn default_max_tunnels() -> u32 {
1
}

11
client/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "client"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
common = { path = "../common" }

254
client/src/main.rs Normal file
View File

@@ -0,0 +1,254 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use anyhow::{Context, Result};
use common::{
codec::{read_frame, write_frame},
protocol::{
ClientFrame, Heartbeat, RegisterRequest, ServerFrame, StreamClosed, StreamData,
},
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
sync::{RwLock, mpsc},
time::{MissedTickBehavior, sleep},
};
use tracing::{error, info, warn};
#[derive(Clone)]
struct ClientConfig {
relay_addr: String,
token: String,
region: String,
local_addr: String,
requested_subdomain: Option<String>,
}
impl ClientConfig {
fn from_env() -> Self {
Self {
relay_addr: std::env::var("DVV_RELAY_ADDR").unwrap_or_else(|_| "127.0.0.1:7000".into()),
token: std::env::var("DVV_TOKEN").unwrap_or_else(|_| "dev-token-local".into()),
region: std::env::var("DVV_REGION").unwrap_or_else(|_| "eu".into()),
local_addr: std::env::var("DVV_LOCAL_ADDR").unwrap_or_else(|_| "127.0.0.1:25565".into()),
requested_subdomain: std::env::var("DVV_SUBDOMAIN").ok(),
}
}
}
type StreamSinkMap = Arc<RwLock<HashMap<String, mpsc::Sender<Vec<u8>>>>>;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "client=info".into()),
)
.init();
let cfg = ClientConfig::from_env();
let mut backoff = Duration::from_millis(300);
loop {
match run_session(cfg.clone()).await {
Ok(()) => warn!("session ended; reconnecting"),
Err(e) => error!(error = %e, "session failed"),
}
sleep(backoff).await;
backoff = (backoff * 2).min(Duration::from_secs(10));
}
}
async fn run_session(cfg: ClientConfig) -> Result<()> {
let stream = TcpStream::connect(&cfg.relay_addr)
.await
.with_context(|| format!("connect relay {}", cfg.relay_addr))?;
let (mut reader, mut writer) = stream.into_split();
write_frame(
&mut writer,
&ClientFrame::Register(RegisterRequest {
token: cfg.token.clone(),
region: cfg.region.clone(),
requested_subdomain: cfg.requested_subdomain.clone(),
local_addr: cfg.local_addr.clone(),
}),
)
.await?;
let accepted = match read_frame::<_, ServerFrame>(&mut reader)
.await
.context("read register response")?
{
ServerFrame::RegisterAccepted(ok) => ok,
ServerFrame::RegisterRejected { reason } => anyhow::bail!("register rejected: {reason}"),
other => anyhow::bail!("unexpected frame at register: {other:?}"),
};
info!(
fqdn = %accepted.fqdn,
session_id = %accepted.session_id,
owner = %accepted.owner_instance_id,
"registered tunnel"
);
let sinks: StreamSinkMap = Arc::new(RwLock::new(HashMap::new()));
let (out_tx, mut out_rx) = mpsc::channel::<ClientFrame>(1024);
let writer_task = tokio::spawn(async move {
while let Some(frame) = out_rx.recv().await {
write_frame(&mut writer, &frame).await?;
}
Ok::<(), anyhow::Error>(())
});
let hb_tx = out_tx.clone();
let hb_sinks = sinks.clone();
let hb_session_id = accepted.session_id.clone();
let hb_interval = Duration::from_secs(accepted.heartbeat_interval_secs.max(1));
let hb_task = tokio::spawn(async move {
let mut ticker = tokio::time::interval(hb_interval);
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
loop {
ticker.tick().await;
let active_streams = hb_sinks.read().await.len() as u32;
let frame = ClientFrame::Heartbeat(Heartbeat {
session_id: hb_session_id.clone(),
active_streams,
bytes_in: 0,
bytes_out: 0,
});
if hb_tx.send(frame).await.is_err() {
break;
}
}
});
loop {
let frame: ServerFrame = read_frame(&mut reader).await?;
match frame {
ServerFrame::Ping => {
let _ = out_tx.send(ClientFrame::Pong).await;
}
ServerFrame::IncomingTcp(incoming) => {
let cfg_clone = cfg.clone();
let out_tx_clone = out_tx.clone();
let sinks_clone = sinks.clone();
tokio::spawn(async move {
if let Err(e) = handle_incoming_stream(cfg_clone, incoming, out_tx_clone, sinks_clone).await {
warn!(error = %e, "incoming stream handling failed");
}
});
}
ServerFrame::StreamData(StreamData { stream_id, data }) => {
if let Some(tx) = sinks.read().await.get(&stream_id).cloned() {
if tx.send(data).await.is_err() {
sinks.write().await.remove(&stream_id);
}
}
}
ServerFrame::StreamClosed(StreamClosed { stream_id, .. }) => {
sinks.write().await.remove(&stream_id);
}
ServerFrame::DrainNotice { retry_after_ms, reason } => {
warn!(retry_after_ms, reason = %reason, "relay draining");
break;
}
ServerFrame::Error { message } => warn!(message = %message, "server error frame"),
ServerFrame::RegisterAccepted(_) | ServerFrame::RegisterRejected { .. } => {
warn!("ignoring unexpected register response after session start")
}
}
}
hb_task.abort();
writer_task.abort();
Ok(())
}
async fn handle_incoming_stream(
cfg: ClientConfig,
incoming: common::protocol::IncomingTcp,
out_tx: mpsc::Sender<ClientFrame>,
sinks: StreamSinkMap,
) -> Result<()> {
let mut local = TcpStream::connect(&cfg.local_addr)
.await
.with_context(|| format!("connect local mc {}", cfg.local_addr))?;
if !incoming.initial_data.is_empty() {
local.write_all(&incoming.initial_data).await?;
}
let (local_read, local_write) = local.into_split();
let (to_local_tx, to_local_rx) = mpsc::channel::<Vec<u8>>(128);
sinks.write().await.insert(incoming.stream_id.clone(), to_local_tx);
let stream_id = incoming.stream_id.clone();
let sinks_clone = sinks.clone();
tokio::spawn(async move {
if let Err(e) = run_local_writer(local_write, to_local_rx).await {
warn!(stream_id = %stream_id, error = %e, "local writer ended");
}
sinks_clone.write().await.remove(&stream_id);
});
let stream_id = incoming.stream_id.clone();
let out_tx_clone = out_tx.clone();
let sinks_clone = sinks.clone();
tokio::spawn(async move {
if let Err(e) = run_local_reader(local_read, out_tx_clone.clone(), stream_id.clone()).await {
warn!(stream_id = %stream_id, error = %e, "local reader ended");
}
let _ = out_tx_clone
.send(ClientFrame::StreamClosed(StreamClosed {
stream_id: stream_id.clone(),
reason: Some("local_reader_closed".into()),
}))
.await;
sinks_clone.write().await.remove(&stream_id);
});
info!(
stream_id = %incoming.stream_id,
peer = %incoming.peer_addr,
hostname = %incoming.hostname,
local = %cfg.local_addr,
"connected relay stream to local minecraft"
);
Ok(())
}
async fn run_local_reader(
mut reader: tokio::net::tcp::OwnedReadHalf,
out_tx: mpsc::Sender<ClientFrame>,
stream_id: String,
) -> Result<()> {
let mut buf = vec![0u8; 16 * 1024];
loop {
let n = reader.read(&mut buf).await?;
if n == 0 {
break;
}
out_tx
.send(ClientFrame::StreamData(StreamData {
stream_id: stream_id.clone(),
data: buf[..n].to_vec(),
}))
.await
.context("send local data to relay")?;
}
Ok(())
}
async fn run_local_writer(
mut writer: tokio::net::tcp::OwnedWriteHalf,
mut rx: mpsc::Receiver<Vec<u8>>,
) -> Result<()> {
while let Some(chunk) = rx.recv().await {
writer.write_all(&chunk).await?;
}
writer.shutdown().await.ok();
Ok(())
}

11
common/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "common"
version = "0.1.0"
edition = "2024"
[dependencies]
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
thiserror.workspace = true
uuid.workspace = true

42
common/src/codec.rs Normal file
View File

@@ -0,0 +1,42 @@
use std::io;
use serde::{Serialize, de::DeserializeOwned};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
const MAX_FRAME_SIZE: usize = 1024 * 1024;
pub async fn write_frame<W, T>(writer: &mut W, value: &T) -> io::Result<()>
where
W: AsyncWrite + Unpin,
T: Serialize,
{
let payload = serde_json::to_vec(value)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
if payload.len() > MAX_FRAME_SIZE {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"frame exceeds max size",
));
}
writer.write_u32(payload.len() as u32).await?;
writer.write_all(&payload).await?;
writer.flush().await
}
pub async fn read_frame<R, T>(reader: &mut R) -> io::Result<T>
where
R: AsyncRead + Unpin,
T: DeserializeOwned,
{
let len = reader.read_u32().await? as usize;
if len > MAX_FRAME_SIZE {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"frame too large",
));
}
let mut payload = vec![0u8; len];
reader.read_exact(&mut payload).await?;
serde_json::from_slice(&payload)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
}

3
common/src/lib.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod codec;
pub mod minecraft;
pub mod protocol;

106
common/src/minecraft.rs Normal file
View File

@@ -0,0 +1,106 @@
use std::io;
use tokio::io::{AsyncRead, AsyncReadExt};
pub async fn read_handshake_hostname<R>(reader: &mut R) -> io::Result<String>
where
R: AsyncRead + Unpin,
{
let (hostname, _) = read_handshake_hostname_and_bytes(reader).await?;
Ok(hostname)
}
pub async fn read_handshake_hostname_and_bytes<R>(reader: &mut R) -> io::Result<(String, Vec<u8>)>
where
R: AsyncRead + Unpin,
{
let (packet_len, packet_len_bytes) = read_varint_async_with_bytes(reader).await?;
let packet_len = packet_len as usize;
if packet_len == 0 || packet_len > 2048 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid minecraft handshake packet length",
));
}
let mut buf = vec![0u8; packet_len];
reader.read_exact(&mut buf).await?;
let mut cur = &buf[..];
let packet_id = read_varint_slice(&mut cur)?;
if packet_id != 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"expected handshake packet id 0",
));
}
let _protocol_version = read_varint_slice(&mut cur)?;
let host = read_string_slice(&mut cur, 255)?;
if cur.len() < 2 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "missing port"));
}
let _port = u16::from_be_bytes([cur[0], cur[1]]);
cur = &cur[2..];
let _next_state = read_varint_slice(&mut cur)?;
let mut raw = packet_len_bytes;
raw.extend_from_slice(&buf);
Ok((host, raw))
}
async fn read_varint_async_with_bytes<R>(reader: &mut R) -> io::Result<(i32, Vec<u8>)>
where
R: AsyncRead + Unpin,
{
let mut num_read = 0;
let mut result = 0i32;
let mut raw = Vec::with_capacity(5);
loop {
let byte = reader.read_u8().await?;
raw.push(byte);
let value = (byte & 0x7F) as i32;
result |= value << (7 * num_read);
num_read += 1;
if num_read > 5 {
return Err(io::Error::new(io::ErrorKind::InvalidData, "varint too big"));
}
if (byte & 0x80) == 0 {
break;
}
}
Ok((result, raw))
}
fn read_varint_slice(input: &mut &[u8]) -> io::Result<i32> {
let mut num_read = 0;
let mut result = 0i32;
loop {
if input.is_empty() {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "eof"));
}
let byte = input[0];
*input = &input[1..];
let value = (byte & 0x7F) as i32;
result |= value << (7 * num_read);
num_read += 1;
if num_read > 5 {
return Err(io::Error::new(io::ErrorKind::InvalidData, "varint too big"));
}
if (byte & 0x80) == 0 {
break;
}
}
Ok(result)
}
fn read_string_slice(input: &mut &[u8], max_len: usize) -> io::Result<String> {
let len = read_varint_slice(input)? as usize;
if len > max_len || input.len() < len {
return Err(io::Error::new(io::ErrorKind::InvalidData, "bad string len"));
}
let bytes = &input[..len];
*input = &input[len..];
String::from_utf8(bytes.to_vec())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
}

81
common/src/protocol.rs Normal file
View File

@@ -0,0 +1,81 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegisterRequest {
pub token: String,
pub region: String,
pub requested_subdomain: Option<String>,
pub local_addr: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegisterAccepted {
pub session_id: String,
pub fqdn: String,
pub heartbeat_interval_secs: u64,
pub owner_instance_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Heartbeat {
pub session_id: String,
pub active_streams: u32,
pub bytes_in: u64,
pub bytes_out: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IncomingTcp {
pub stream_id: String,
pub session_id: String,
pub peer_addr: String,
pub hostname: String,
pub initial_data: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamData {
pub stream_id: String,
pub data: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamClosed {
pub stream_id: String,
pub reason: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RelayForwardPrelude {
pub version: u8,
pub session_id: String,
pub fqdn: String,
pub stream_id: String,
pub peer_addr: String,
pub origin_instance_id: String,
pub hop_count: u8,
pub initial_data: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum ClientFrame {
Register(RegisterRequest),
Heartbeat(Heartbeat),
StreamData(StreamData),
StreamClosed(StreamClosed),
Pong,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
pub enum ServerFrame {
RegisterAccepted(RegisterAccepted),
RegisterRejected { reason: String },
IncomingTcp(IncomingTcp),
StreamData(StreamData),
StreamClosed(StreamClosed),
Ping,
DrainNotice { retry_after_ms: u64, reason: String },
Error { message: String },
}

67
db/schema.sql Normal file
View File

@@ -0,0 +1,67 @@
-- PostgreSQL schema scaffold for dvv tunnel SaaS platform.
create extension if not exists pgcrypto;
create table if not exists users (
id uuid primary key default gen_random_uuid(),
email text unique not null,
password_hash text,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create table if not exists plans (
id text primary key,
name text not null,
max_tunnels integer not null,
custom_subdomain boolean not null default false,
idle_timeout_seconds integer,
bandwidth_cap_kbps integer,
priority_routing boolean not null default false,
created_at timestamptz not null default now()
);
create table if not exists subscriptions (
id uuid primary key default gen_random_uuid(),
user_id uuid not null references users(id) on delete cascade,
plan_id text not null references plans(id),
provider text not null default 'stripe',
provider_customer_id text,
provider_subscription_id text,
status text not null,
current_period_end timestamptz,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now()
);
create index if not exists subscriptions_user_id_idx on subscriptions(user_id);
create table if not exists tunnels (
id uuid primary key default gen_random_uuid(),
user_id uuid not null references users(id) on delete cascade,
region text not null check (region in ('eu', 'us', 'asia')),
subdomain text not null,
custom_domain boolean not null default false,
active boolean not null default true,
created_at timestamptz not null default now(),
updated_at timestamptz not null default now(),
unique (region, subdomain)
);
create index if not exists tunnels_user_id_idx on tunnels(user_id);
create table if not exists usage_rollups_hourly (
user_id uuid not null references users(id) on delete cascade,
hour_bucket timestamptz not null,
bytes_in bigint not null default 0,
bytes_out bigint not null default 0,
player_connections integer not null default 0,
primary key (user_id, hour_bucket)
);
insert into plans (id, name, max_tunnels, custom_subdomain, idle_timeout_seconds, bandwidth_cap_kbps, priority_routing)
values
('free', 'Free', 1, false, 900, 512, false),
('pro', 'Pro', 5, true, null, 8192, true),
('team', 'Team', 25, true, null, null, true)
on conflict (id) do nothing;

32
docker-compose.yml Normal file
View File

@@ -0,0 +1,32 @@
version: "3.9"
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: dvv
POSTGRES_PASSWORD: dvv
POSTGRES_DB: dvv
ports:
- "5432:5432"
volumes:
- ./db/schema.sql:/docker-entrypoint-initdb.d/001-schema.sql:ro
auth-api:
build:
context: .
dockerfile: Dockerfile.auth-api
environment:
AUTH_BIND: 0.0.0.0:8080
JWT_SECRET: dev-secret-change-me
REDIS_URL: redis://redis:6379
depends_on:
- redis
ports:
- "8080:8080"

17
relay/Cargo.toml Normal file
View File

@@ -0,0 +1,17 @@
[package]
name = "relay"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
uuid.workspace = true
fastrand.workspace = true
redis.workspace = true
serde_json.workspace = true
chrono.workspace = true
serde.workspace = true
common = { path = "../common" }

1063
relay/src/main.rs Normal file

File diff suppressed because it is too large Load Diff