Compare commits
9 Commits
b4942e4ab1
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
230a9212fe | ||
|
|
28918880da | ||
|
|
37090d80b0 | ||
|
|
a45a9b0392 | ||
|
|
4ce94a5b17 | ||
|
|
fe8376dd6d | ||
|
|
09205f8db2 | ||
|
|
e7ef7fdf70 | ||
|
|
050dbc792a |
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
target/
|
||||||
|
.DS_Store
|
||||||
|
.env
|
||||||
|
.env.*
|
||||||
|
!.env.example
|
||||||
2674
Cargo.lock
generated
Normal file
2674
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
24
Cargo.toml
Normal file
24
Cargo.toml
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
[workspace]
|
||||||
|
members = ["common", "relay", "client", "auth-api"]
|
||||||
|
resolver = "2"
|
||||||
|
|
||||||
|
[workspace.package]
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[workspace.dependencies]
|
||||||
|
anyhow = "1.0"
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_json = "1.0"
|
||||||
|
tokio = { version = "1.44", features = ["full"] }
|
||||||
|
tracing = "0.1"
|
||||||
|
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||||
|
uuid = { version = "1.16", features = ["v4", "serde"] }
|
||||||
|
thiserror = "2.0"
|
||||||
|
fastrand = "2.3"
|
||||||
|
axum = "0.8"
|
||||||
|
redis = { version = "0.32", features = ["tokio-comp", "connection-manager"] }
|
||||||
|
jsonwebtoken = "10"
|
||||||
|
chrono = { version = "0.4", features = ["serde", "clock"] }
|
||||||
|
metrics = "0.24"
|
||||||
|
metrics-exporter-prometheus = "0.17"
|
||||||
|
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4", "with-serde_json-1"] }
|
||||||
12
Dockerfile.auth-api
Normal file
12
Dockerfile.auth-api
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
FROM rust:1.86 as build
|
||||||
|
WORKDIR /app
|
||||||
|
COPY . .
|
||||||
|
RUN cargo build --release -p auth-api
|
||||||
|
|
||||||
|
FROM debian:bookworm-slim
|
||||||
|
RUN useradd -r -u 10001 appuser
|
||||||
|
WORKDIR /app
|
||||||
|
COPY --from=build /app/target/release/auth-api /usr/local/bin/auth-api
|
||||||
|
USER appuser
|
||||||
|
EXPOSE 8080
|
||||||
|
CMD ["auth-api"]
|
||||||
311
MASTER_PROMPT.md
Normal file
311
MASTER_PROMPT.md
Normal file
@@ -0,0 +1,311 @@
|
|||||||
|
# 🧠 MASTER SYSTEM PROMPT
|
||||||
|
|
||||||
|
## Distributed Minecraft Tunnel Platform (Railway-Scalable, Replica-Aware, Monetizable)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## ROLE
|
||||||
|
|
||||||
|
You are a senior distributed systems architect and Rust network engineer.
|
||||||
|
|
||||||
|
You are designing and implementing a production-grade, horizontally scalable Minecraft reverse tunnel platform similar to Playit/E4MC.
|
||||||
|
|
||||||
|
The system must:
|
||||||
|
|
||||||
|
* Scale from 1k → 50k+ concurrent tunnels
|
||||||
|
* Support multi-region
|
||||||
|
* Support Railway auto-scaling
|
||||||
|
* Be replica-safe
|
||||||
|
* Be stateless at relay layer
|
||||||
|
* Be monetization-ready
|
||||||
|
* Be resilient to instance restarts
|
||||||
|
* Maintain low latency
|
||||||
|
* Be designed for high concurrency
|
||||||
|
|
||||||
|
This is not a hobby project.
|
||||||
|
Design for real-world production constraints.
|
||||||
|
|
||||||
|
---
|
||||||
|
# 🎯 SYSTEM OBJECTIVE
|
||||||
|
|
||||||
|
Users run a Rust client locally.
|
||||||
|
They receive a subdomain:
|
||||||
|
|
||||||
|
```
|
||||||
|
sleepy-creeper.eu.dvv.one
|
||||||
|
```
|
||||||
|
|
||||||
|
Players connect to that subdomain.
|
||||||
|
Traffic is routed through relay nodes to the user’s home server.
|
||||||
|
|
||||||
|
No port forwarding required.
|
||||||
|
|
||||||
|
---
|
||||||
|
# 🏗 GLOBAL ARCHITECTURE REQUIREMENTS
|
||||||
|
|
||||||
|
You must design:
|
||||||
|
|
||||||
|
1. Rust Relay Service (stateless)
|
||||||
|
2. Rust Client Agent
|
||||||
|
3. Redis cluster (shared tunnel state)
|
||||||
|
4. PostgreSQL (users + billing)
|
||||||
|
5. Auth API (minimal Rust HTTP service)
|
||||||
|
6. Stripe billing integration
|
||||||
|
7. Railway deployment topology
|
||||||
|
8. Multi-region support
|
||||||
|
9. Replica-aware routing
|
||||||
|
10. Graceful autoscaling logic
|
||||||
|
|
||||||
|
---
|
||||||
|
# 🌍 MULTI-REGION MODEL
|
||||||
|
|
||||||
|
Regions:
|
||||||
|
|
||||||
|
```
|
||||||
|
eu
|
||||||
|
us
|
||||||
|
asia (optional)
|
||||||
|
```
|
||||||
|
|
||||||
|
Each region:
|
||||||
|
|
||||||
|
* Has its own relay cluster
|
||||||
|
* Has its own Railway service group
|
||||||
|
* Shares central Redis + Postgres (or region-local Redis if specified)
|
||||||
|
|
||||||
|
Subdomain format:
|
||||||
|
|
||||||
|
```
|
||||||
|
adjective-noun.{region}.dvv.one
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
# 🧱 RELAY SERVICE REQUIREMENTS (RUST)
|
||||||
|
|
||||||
|
Relay must:
|
||||||
|
|
||||||
|
* Listen on:
|
||||||
|
* 7000 (tunnel registration)
|
||||||
|
* 25565 (Minecraft routing)
|
||||||
|
* Be async (Tokio)
|
||||||
|
* Be horizontally scalable
|
||||||
|
* NOT store tunnel state locally
|
||||||
|
|
||||||
|
---
|
||||||
|
## 🔁 TUNNEL STATE MODEL (REDIS)
|
||||||
|
|
||||||
|
Redis stores:
|
||||||
|
|
||||||
|
```
|
||||||
|
subdomain → instance_id
|
||||||
|
instance_id → active tunnel metadata
|
||||||
|
user_id → plan tier
|
||||||
|
```
|
||||||
|
|
||||||
|
Relay instances must:
|
||||||
|
|
||||||
|
* Register themselves in Redis on startup
|
||||||
|
* Maintain heartbeat key:
|
||||||
|
|
||||||
|
```
|
||||||
|
instance:{id} → alive
|
||||||
|
```
|
||||||
|
* Remove stale instance data on startup
|
||||||
|
|
||||||
|
---
|
||||||
|
## 🔄 REPLICA SUPPORT
|
||||||
|
|
||||||
|
If multiple relay replicas are running:
|
||||||
|
|
||||||
|
* Any relay can receive player connection
|
||||||
|
* It checks Redis:
|
||||||
|
|
||||||
|
```
|
||||||
|
subdomain → owning instance
|
||||||
|
```
|
||||||
|
|
||||||
|
If tunnel owned by different instance:
|
||||||
|
|
||||||
|
* Open internal TCP connection to that instance
|
||||||
|
* Forward traffic internally
|
||||||
|
* Do NOT reject connection
|
||||||
|
|
||||||
|
Cross-instance routing must:
|
||||||
|
|
||||||
|
* Be efficient
|
||||||
|
* Add minimal latency
|
||||||
|
* Avoid infinite routing loops
|
||||||
|
* Avoid recursive forwarding
|
||||||
|
|
||||||
|
---
|
||||||
|
# 🚂 RAILWAY COMPATIBILITY REQUIREMENTS
|
||||||
|
|
||||||
|
You MUST design for:
|
||||||
|
|
||||||
|
* Ephemeral containers
|
||||||
|
* No persistent disk
|
||||||
|
* Dynamic scaling up/down
|
||||||
|
* Instance restarts at any time
|
||||||
|
* Non-static internal IPs
|
||||||
|
|
||||||
|
Therefore:
|
||||||
|
|
||||||
|
* All state in Redis
|
||||||
|
* No reliance on local memory persistence
|
||||||
|
* Graceful shutdown support (SIGTERM handler)
|
||||||
|
* Drain active tunnels before shutdown
|
||||||
|
|
||||||
|
---
|
||||||
|
# 📈 AUTOSCALING LOGIC
|
||||||
|
|
||||||
|
When scaling up:
|
||||||
|
|
||||||
|
* New instances register in Redis
|
||||||
|
* New tunnel registrations assigned to least-loaded instance
|
||||||
|
* Use Redis sorted set for load tracking
|
||||||
|
|
||||||
|
When scaling down:
|
||||||
|
|
||||||
|
* Instance marks itself as “draining”
|
||||||
|
* Stops accepting new tunnels
|
||||||
|
* Waits until active tunnels reach 0
|
||||||
|
* Exits cleanly
|
||||||
|
|
||||||
|
---
|
||||||
|
# 🧠 CLIENT REQUIREMENTS (RUST)
|
||||||
|
|
||||||
|
Client must:
|
||||||
|
|
||||||
|
* Benchmark latency to all regions
|
||||||
|
* Connect to lowest-latency region
|
||||||
|
* Authenticate using token
|
||||||
|
* Maintain persistent tunnel
|
||||||
|
* Reconnect automatically
|
||||||
|
* Handle relay instance migration
|
||||||
|
* Support multiple concurrent forwarded connections
|
||||||
|
* Be under 10MB binary
|
||||||
|
|
||||||
|
---
|
||||||
|
# 💰 SAAS & MONETIZATION LAYER
|
||||||
|
|
||||||
|
Free tier:
|
||||||
|
|
||||||
|
* 1 tunnel
|
||||||
|
* Random subdomain
|
||||||
|
* Idle timeout
|
||||||
|
* Limited bandwidth
|
||||||
|
|
||||||
|
Paid tier:
|
||||||
|
|
||||||
|
* Custom subdomain
|
||||||
|
* Multiple tunnels
|
||||||
|
* Higher bandwidth cap
|
||||||
|
* No idle timeout
|
||||||
|
* Priority routing
|
||||||
|
|
||||||
|
---
|
||||||
|
## 🔐 AUTH MODEL
|
||||||
|
|
||||||
|
* JWT tokens
|
||||||
|
* Client must send token during registration
|
||||||
|
* Relay verifies token via Redis cache
|
||||||
|
* Plan limits enforced in real time
|
||||||
|
|
||||||
|
---
|
||||||
|
# 🧮 SCALING TARGETS
|
||||||
|
|
||||||
|
Design must handle:
|
||||||
|
|
||||||
|
| Level | Active Tunnels | Player Connections |
|
||||||
|
| ------ | -------------- | ------------------ |
|
||||||
|
| Small | 1,000 | 5,000 |
|
||||||
|
| Medium | 10,000 | 50,000 |
|
||||||
|
| Large | 50,000 | 200,000 |
|
||||||
|
|
||||||
|
Include:
|
||||||
|
|
||||||
|
* Memory usage estimates
|
||||||
|
* CPU usage estimates
|
||||||
|
* Redis throughput estimates
|
||||||
|
* Bandwidth estimates
|
||||||
|
|
||||||
|
---
|
||||||
|
# ⚡ PERFORMANCE CONSTRAINTS
|
||||||
|
|
||||||
|
* No blocking I/O
|
||||||
|
* No global Mutex<HashMap>
|
||||||
|
* No heavy frameworks in relay
|
||||||
|
* Zero-copy forwarding
|
||||||
|
* Use `tokio::io::copy_bidirectional`
|
||||||
|
* Minimal allocations in hot path
|
||||||
|
* Avoid logging per packet
|
||||||
|
|
||||||
|
---
|
||||||
|
# 🛡 FAILURE SCENARIOS TO HANDLE
|
||||||
|
|
||||||
|
* Relay instance crash
|
||||||
|
* Redis temporary outage
|
||||||
|
* Client reconnect storm
|
||||||
|
* Player connects during migration
|
||||||
|
* Instance scaling down mid-traffic
|
||||||
|
* Partial network partition
|
||||||
|
|
||||||
|
Explain how each is handled safely.
|
||||||
|
|
||||||
|
---
|
||||||
|
# 📦 REQUIRED OUTPUT FORMAT
|
||||||
|
|
||||||
|
Return:
|
||||||
|
|
||||||
|
1. Full system architecture diagram (text-based)
|
||||||
|
2. Component descriptions
|
||||||
|
3. Redis schema
|
||||||
|
4. Cross-instance routing design
|
||||||
|
5. Autoscaling algorithm
|
||||||
|
6. Graceful shutdown flow
|
||||||
|
7. Client architecture
|
||||||
|
8. SaaS enforcement model
|
||||||
|
9. Railway deployment layout
|
||||||
|
10. Cost estimates at 1k / 10k / 50k scale
|
||||||
|
11. Bottleneck analysis
|
||||||
|
12. Optimization opportunities
|
||||||
|
|
||||||
|
Do NOT provide vague explanations.
|
||||||
|
Be concrete, structured, and technical.
|
||||||
|
|
||||||
|
---
|
||||||
|
# 🔥 ADVANCED MODE (MANDATORY)
|
||||||
|
|
||||||
|
Include:
|
||||||
|
|
||||||
|
* Replica-aware routing logic
|
||||||
|
* Load-based tunnel assignment
|
||||||
|
* Rate limiting strategy
|
||||||
|
* DDoS mitigation outline
|
||||||
|
* Observability plan (metrics + tracing)
|
||||||
|
* Suggested Prometheus metrics
|
||||||
|
* Suggested horizontal scaling thresholds
|
||||||
|
|
||||||
|
---
|
||||||
|
# 🚀 DESIGN PRIORITY ORDER
|
||||||
|
|
||||||
|
1. Correctness
|
||||||
|
2. Scalability
|
||||||
|
3. Low latency
|
||||||
|
4. Railway compatibility
|
||||||
|
5. Monetization enforcement
|
||||||
|
6. Cost efficiency
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
This single prompt forces an AI agent to produce a **real distributed system blueprint**, not toy code.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
If you want next, I can:
|
||||||
|
|
||||||
|
* Design the *ultra-low-latency no-Redis single-region version*
|
||||||
|
* Or design the *global Anycast + BGP version like major networks*
|
||||||
|
* Or help you decide whether Railway is actually the right choice for 50k tunnels*
|
||||||
|
|
||||||
|
You’re now at infrastructure-architect level thinking.
|
||||||
21
auth-api/Cargo.toml
Normal file
21
auth-api/Cargo.toml
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
[package]
|
||||||
|
name = "auth-api"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow.workspace = true
|
||||||
|
axum.workspace = true
|
||||||
|
chrono.workspace = true
|
||||||
|
jsonwebtoken.workspace = true
|
||||||
|
redis.workspace = true
|
||||||
|
serde.workspace = true
|
||||||
|
serde_json.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
|
tracing.workspace = true
|
||||||
|
tracing-subscriber.workspace = true
|
||||||
|
fastrand.workspace = true
|
||||||
|
metrics.workspace = true
|
||||||
|
metrics-exporter-prometheus.workspace = true
|
||||||
|
tokio-postgres.workspace = true
|
||||||
|
uuid.workspace = true
|
||||||
333
auth-api/src/main.rs
Normal file
333
auth-api/src/main.rs
Normal file
@@ -0,0 +1,333 @@
|
|||||||
|
use std::{net::SocketAddr, sync::Arc, time::Instant};
|
||||||
|
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use axum::{
|
||||||
|
Json, Router,
|
||||||
|
extract::State,
|
||||||
|
http::StatusCode,
|
||||||
|
response::IntoResponse,
|
||||||
|
routing::{get, post},
|
||||||
|
};
|
||||||
|
use chrono::{Duration, Utc};
|
||||||
|
use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, Validation, decode, encode};
|
||||||
|
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
|
||||||
|
use redis::AsyncCommands;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio_postgres::{Client as PgClient, NoTls};
|
||||||
|
use tracing::{info, warn};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct AppState {
|
||||||
|
jwt_secret: Arc<String>,
|
||||||
|
redis: Option<redis::aio::ConnectionManager>,
|
||||||
|
pg: Option<Arc<PgClient>>,
|
||||||
|
metrics: PrometheusHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
|
struct Claims {
|
||||||
|
sub: String,
|
||||||
|
tier: String,
|
||||||
|
max_tunnels: u32,
|
||||||
|
exp: usize,
|
||||||
|
iat: usize,
|
||||||
|
jti: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct DevTokenRequest {
|
||||||
|
user_id: String,
|
||||||
|
#[serde(default = "default_tier")]
|
||||||
|
tier: String,
|
||||||
|
#[serde(default = "default_max_tunnels")]
|
||||||
|
max_tunnels: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct TokenResponse {
|
||||||
|
token: String,
|
||||||
|
expires_at: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct ValidateRequest {
|
||||||
|
token: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct ValidateResponse {
|
||||||
|
valid: bool,
|
||||||
|
user_id: Option<String>,
|
||||||
|
tier: Option<String>,
|
||||||
|
max_tunnels: Option<u32>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
struct PlanEntitlement {
|
||||||
|
tier: String,
|
||||||
|
max_tunnels: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<()> {
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_env_filter(
|
||||||
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||||
|
.unwrap_or_else(|_| "auth_api=info".into()),
|
||||||
|
)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let bind = std::env::var("AUTH_BIND").unwrap_or_else(|_| "0.0.0.0:8080".into());
|
||||||
|
let jwt_secret = std::env::var("JWT_SECRET").unwrap_or_else(|_| "dev-secret-change-me".into());
|
||||||
|
let metrics = PrometheusBuilder::new()
|
||||||
|
.install_recorder()
|
||||||
|
.context("install prometheus recorder")?;
|
||||||
|
|
||||||
|
let redis = if let Ok(url) = std::env::var("REDIS_URL") {
|
||||||
|
let client = redis::Client::open(url.clone()).context("open redis client")?;
|
||||||
|
match redis::aio::ConnectionManager::new(client).await {
|
||||||
|
Ok(conn) => {
|
||||||
|
info!("auth-api connected to redis");
|
||||||
|
Some(conn)
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(error = %e, "auth-api redis unavailable; continuing without cache");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let pg = connect_postgres().await?;
|
||||||
|
|
||||||
|
let state = AppState {
|
||||||
|
jwt_secret: Arc::new(jwt_secret),
|
||||||
|
redis,
|
||||||
|
pg,
|
||||||
|
metrics,
|
||||||
|
};
|
||||||
|
|
||||||
|
let app = Router::new()
|
||||||
|
.route("/healthz", get(healthz))
|
||||||
|
.route("/metrics", get(metrics_endpoint))
|
||||||
|
.route("/v1/token/dev", post(issue_dev_token))
|
||||||
|
.route("/v1/token/validate", post(validate_token))
|
||||||
|
.with_state(state);
|
||||||
|
|
||||||
|
let listener = tokio::net::TcpListener::bind(&bind)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("bind {bind}"))?;
|
||||||
|
let local_addr: SocketAddr = listener.local_addr()?;
|
||||||
|
info!(addr = %local_addr, "auth-api listening");
|
||||||
|
axum::serve(listener, app).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connect_postgres() -> Result<Option<Arc<PgClient>>> {
|
||||||
|
let Some(url) = std::env::var("DATABASE_URL").ok() else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
let (client, conn) = tokio_postgres::connect(&url, NoTls)
|
||||||
|
.await
|
||||||
|
.context("connect postgres")?;
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = conn.await {
|
||||||
|
warn!(error = %e, "postgres connection task ended");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
info!("auth-api connected to postgres");
|
||||||
|
Ok(Some(Arc::new(client)))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn healthz() -> &'static str {
|
||||||
|
"ok"
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn metrics_endpoint(State(state): State<AppState>) -> impl IntoResponse {
|
||||||
|
state.metrics.render()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(state))]
|
||||||
|
async fn issue_dev_token(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Json(req): Json<DevTokenRequest>,
|
||||||
|
) -> Result<Json<TokenResponse>, ApiError> {
|
||||||
|
let started = Instant::now();
|
||||||
|
let now = Utc::now();
|
||||||
|
let exp = now + Duration::hours(24);
|
||||||
|
let claims = Claims {
|
||||||
|
sub: req.user_id,
|
||||||
|
tier: req.tier,
|
||||||
|
max_tunnels: req.max_tunnels,
|
||||||
|
exp: exp.timestamp() as usize,
|
||||||
|
iat: now.timestamp() as usize,
|
||||||
|
jti: format!("jti-{}", fastrand::u64(..)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let token = encode(
|
||||||
|
&Header::default(),
|
||||||
|
&claims,
|
||||||
|
&EncodingKey::from_secret(state.jwt_secret.as_bytes()),
|
||||||
|
)
|
||||||
|
.map_err(ApiError::internal)?;
|
||||||
|
|
||||||
|
sync_jwt_cache(&state, &claims).await?;
|
||||||
|
metrics::counter!("auth_jwt_issued_total").increment(1);
|
||||||
|
metrics::histogram!("auth_issue_token_latency_ms")
|
||||||
|
.record(started.elapsed().as_secs_f64() * 1000.0);
|
||||||
|
|
||||||
|
Ok(Json(TokenResponse {
|
||||||
|
token,
|
||||||
|
expires_at: exp.timestamp(),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(state, req))]
|
||||||
|
async fn validate_token(
|
||||||
|
State(state): State<AppState>,
|
||||||
|
Json(req): Json<ValidateRequest>,
|
||||||
|
) -> Result<Json<ValidateResponse>, ApiError> {
|
||||||
|
let started = Instant::now();
|
||||||
|
let decoded = decode::<Claims>(
|
||||||
|
&req.token,
|
||||||
|
&DecodingKey::from_secret(state.jwt_secret.as_bytes()),
|
||||||
|
&Validation::new(Algorithm::HS256),
|
||||||
|
);
|
||||||
|
|
||||||
|
let response = match decoded {
|
||||||
|
Ok(tok) => {
|
||||||
|
let claims = tok.claims;
|
||||||
|
let ent = match load_plan_for_user(&state, &claims.sub).await? {
|
||||||
|
Some(db_plan) => db_plan,
|
||||||
|
None => PlanEntitlement {
|
||||||
|
tier: claims.tier.clone(),
|
||||||
|
max_tunnels: claims.max_tunnels,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
sync_plan_cache(&state, &claims.sub, &ent, "auth-validate").await?;
|
||||||
|
metrics::counter!("auth_token_validate_total", "result" => "valid").increment(1);
|
||||||
|
ValidateResponse {
|
||||||
|
valid: true,
|
||||||
|
user_id: Some(claims.sub),
|
||||||
|
tier: Some(ent.tier),
|
||||||
|
max_tunnels: Some(ent.max_tunnels),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
metrics::counter!("auth_token_validate_total", "result" => "invalid").increment(1);
|
||||||
|
ValidateResponse {
|
||||||
|
valid: false,
|
||||||
|
user_id: None,
|
||||||
|
tier: None,
|
||||||
|
max_tunnels: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
metrics::histogram!("auth_validate_token_latency_ms")
|
||||||
|
.record(started.elapsed().as_secs_f64() * 1000.0);
|
||||||
|
Ok(Json(response))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn sync_jwt_cache(state: &AppState, claims: &Claims) -> Result<(), ApiError> {
|
||||||
|
if let Some(mut redis) = state.redis.clone() {
|
||||||
|
let key = format!("auth:jwt:jti:{}", claims.jti);
|
||||||
|
let ttl = (claims.exp as i64 - Utc::now().timestamp()).max(1);
|
||||||
|
let payload = serde_json::json!({
|
||||||
|
"user_id": claims.sub,
|
||||||
|
"plan_tier": claims.tier,
|
||||||
|
"max_tunnels": claims.max_tunnels
|
||||||
|
})
|
||||||
|
.to_string();
|
||||||
|
let _: () = redis
|
||||||
|
.set_ex(key, payload, ttl as u64)
|
||||||
|
.await
|
||||||
|
.map_err(ApiError::internal)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn sync_plan_cache(
|
||||||
|
state: &AppState,
|
||||||
|
user_id: &str,
|
||||||
|
ent: &PlanEntitlement,
|
||||||
|
source: &str,
|
||||||
|
) -> Result<(), ApiError> {
|
||||||
|
if let Some(mut redis) = state.redis.clone() {
|
||||||
|
let key = format!("plan:user:{user_id}");
|
||||||
|
let payload = serde_json::json!({
|
||||||
|
"tier": ent.tier,
|
||||||
|
"max_tunnels": ent.max_tunnels,
|
||||||
|
"source": source,
|
||||||
|
"updated_at": Utc::now().timestamp()
|
||||||
|
})
|
||||||
|
.to_string();
|
||||||
|
let _: () = redis.set_ex(key, payload, 300).await.map_err(ApiError::internal)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_plan_for_user(state: &AppState, user_id: &str) -> Result<Option<PlanEntitlement>, ApiError> {
|
||||||
|
let Some(pg) = &state.pg else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
let _ = Uuid::parse_str(user_id).map_err(ApiError::bad_request)?;
|
||||||
|
let row = pg
|
||||||
|
.query_opt(
|
||||||
|
r#"
|
||||||
|
select p.id as plan_id, p.max_tunnels
|
||||||
|
from subscriptions s
|
||||||
|
join plans p on p.id = s.plan_id
|
||||||
|
where s.user_id = $1::uuid and s.status in ('active', 'trialing')
|
||||||
|
order by s.updated_at desc
|
||||||
|
limit 1
|
||||||
|
"#,
|
||||||
|
&[&user_id],
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(ApiError::internal)?;
|
||||||
|
|
||||||
|
Ok(row.map(|r| PlanEntitlement {
|
||||||
|
tier: r.get::<_, String>("plan_id"),
|
||||||
|
max_tunnels: r.get::<_, i32>("max_tunnels") as u32,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct ApiError {
|
||||||
|
status: StatusCode,
|
||||||
|
message: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ApiError {
|
||||||
|
fn internal<E: std::fmt::Display>(e: E) -> Self {
|
||||||
|
Self {
|
||||||
|
status: StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
message: e.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn bad_request<E: std::fmt::Display>(e: E) -> Self {
|
||||||
|
Self {
|
||||||
|
status: StatusCode::BAD_REQUEST,
|
||||||
|
message: e.to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IntoResponse for ApiError {
|
||||||
|
fn into_response(self) -> axum::response::Response {
|
||||||
|
(self.status, self.message).into_response()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_tier() -> String {
|
||||||
|
"free".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_max_tunnels() -> u32 {
|
||||||
|
1
|
||||||
|
}
|
||||||
11
client/Cargo.toml
Normal file
11
client/Cargo.toml
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
[package]
|
||||||
|
name = "client"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
|
tracing.workspace = true
|
||||||
|
tracing-subscriber.workspace = true
|
||||||
|
common = { path = "../common" }
|
||||||
254
client/src/main.rs
Normal file
254
client/src/main.rs
Normal file
@@ -0,0 +1,254 @@
|
|||||||
|
use std::{collections::HashMap, sync::Arc, time::Duration};
|
||||||
|
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use common::{
|
||||||
|
codec::{read_frame, write_frame},
|
||||||
|
protocol::{
|
||||||
|
ClientFrame, Heartbeat, RegisterRequest, ServerFrame, StreamClosed, StreamData,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use tokio::{
|
||||||
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
|
net::TcpStream,
|
||||||
|
sync::{RwLock, mpsc},
|
||||||
|
time::{MissedTickBehavior, sleep},
|
||||||
|
};
|
||||||
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct ClientConfig {
|
||||||
|
relay_addr: String,
|
||||||
|
token: String,
|
||||||
|
region: String,
|
||||||
|
local_addr: String,
|
||||||
|
requested_subdomain: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ClientConfig {
|
||||||
|
fn from_env() -> Self {
|
||||||
|
Self {
|
||||||
|
relay_addr: std::env::var("DVV_RELAY_ADDR").unwrap_or_else(|_| "127.0.0.1:7000".into()),
|
||||||
|
token: std::env::var("DVV_TOKEN").unwrap_or_else(|_| "dev-token-local".into()),
|
||||||
|
region: std::env::var("DVV_REGION").unwrap_or_else(|_| "eu".into()),
|
||||||
|
local_addr: std::env::var("DVV_LOCAL_ADDR").unwrap_or_else(|_| "127.0.0.1:25565".into()),
|
||||||
|
requested_subdomain: std::env::var("DVV_SUBDOMAIN").ok(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type StreamSinkMap = Arc<RwLock<HashMap<String, mpsc::Sender<Vec<u8>>>>>;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<()> {
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_env_filter(
|
||||||
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||||
|
.unwrap_or_else(|_| "client=info".into()),
|
||||||
|
)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
let cfg = ClientConfig::from_env();
|
||||||
|
let mut backoff = Duration::from_millis(300);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match run_session(cfg.clone()).await {
|
||||||
|
Ok(()) => warn!("session ended; reconnecting"),
|
||||||
|
Err(e) => error!(error = %e, "session failed"),
|
||||||
|
}
|
||||||
|
sleep(backoff).await;
|
||||||
|
backoff = (backoff * 2).min(Duration::from_secs(10));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_session(cfg: ClientConfig) -> Result<()> {
|
||||||
|
let stream = TcpStream::connect(&cfg.relay_addr)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("connect relay {}", cfg.relay_addr))?;
|
||||||
|
let (mut reader, mut writer) = stream.into_split();
|
||||||
|
|
||||||
|
write_frame(
|
||||||
|
&mut writer,
|
||||||
|
&ClientFrame::Register(RegisterRequest {
|
||||||
|
token: cfg.token.clone(),
|
||||||
|
region: cfg.region.clone(),
|
||||||
|
requested_subdomain: cfg.requested_subdomain.clone(),
|
||||||
|
local_addr: cfg.local_addr.clone(),
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let accepted = match read_frame::<_, ServerFrame>(&mut reader)
|
||||||
|
.await
|
||||||
|
.context("read register response")?
|
||||||
|
{
|
||||||
|
ServerFrame::RegisterAccepted(ok) => ok,
|
||||||
|
ServerFrame::RegisterRejected { reason } => anyhow::bail!("register rejected: {reason}"),
|
||||||
|
other => anyhow::bail!("unexpected frame at register: {other:?}"),
|
||||||
|
};
|
||||||
|
|
||||||
|
info!(
|
||||||
|
fqdn = %accepted.fqdn,
|
||||||
|
session_id = %accepted.session_id,
|
||||||
|
owner = %accepted.owner_instance_id,
|
||||||
|
"registered tunnel"
|
||||||
|
);
|
||||||
|
|
||||||
|
let sinks: StreamSinkMap = Arc::new(RwLock::new(HashMap::new()));
|
||||||
|
let (out_tx, mut out_rx) = mpsc::channel::<ClientFrame>(1024);
|
||||||
|
|
||||||
|
let writer_task = tokio::spawn(async move {
|
||||||
|
while let Some(frame) = out_rx.recv().await {
|
||||||
|
write_frame(&mut writer, &frame).await?;
|
||||||
|
}
|
||||||
|
Ok::<(), anyhow::Error>(())
|
||||||
|
});
|
||||||
|
|
||||||
|
let hb_tx = out_tx.clone();
|
||||||
|
let hb_sinks = sinks.clone();
|
||||||
|
let hb_session_id = accepted.session_id.clone();
|
||||||
|
let hb_interval = Duration::from_secs(accepted.heartbeat_interval_secs.max(1));
|
||||||
|
let hb_task = tokio::spawn(async move {
|
||||||
|
let mut ticker = tokio::time::interval(hb_interval);
|
||||||
|
ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
|
||||||
|
loop {
|
||||||
|
ticker.tick().await;
|
||||||
|
let active_streams = hb_sinks.read().await.len() as u32;
|
||||||
|
let frame = ClientFrame::Heartbeat(Heartbeat {
|
||||||
|
session_id: hb_session_id.clone(),
|
||||||
|
active_streams,
|
||||||
|
bytes_in: 0,
|
||||||
|
bytes_out: 0,
|
||||||
|
});
|
||||||
|
if hb_tx.send(frame).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let frame: ServerFrame = read_frame(&mut reader).await?;
|
||||||
|
match frame {
|
||||||
|
ServerFrame::Ping => {
|
||||||
|
let _ = out_tx.send(ClientFrame::Pong).await;
|
||||||
|
}
|
||||||
|
ServerFrame::IncomingTcp(incoming) => {
|
||||||
|
let cfg_clone = cfg.clone();
|
||||||
|
let out_tx_clone = out_tx.clone();
|
||||||
|
let sinks_clone = sinks.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = handle_incoming_stream(cfg_clone, incoming, out_tx_clone, sinks_clone).await {
|
||||||
|
warn!(error = %e, "incoming stream handling failed");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
ServerFrame::StreamData(StreamData { stream_id, data }) => {
|
||||||
|
if let Some(tx) = sinks.read().await.get(&stream_id).cloned() {
|
||||||
|
if tx.send(data).await.is_err() {
|
||||||
|
sinks.write().await.remove(&stream_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ServerFrame::StreamClosed(StreamClosed { stream_id, .. }) => {
|
||||||
|
sinks.write().await.remove(&stream_id);
|
||||||
|
}
|
||||||
|
ServerFrame::DrainNotice { retry_after_ms, reason } => {
|
||||||
|
warn!(retry_after_ms, reason = %reason, "relay draining");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ServerFrame::Error { message } => warn!(message = %message, "server error frame"),
|
||||||
|
ServerFrame::RegisterAccepted(_) | ServerFrame::RegisterRejected { .. } => {
|
||||||
|
warn!("ignoring unexpected register response after session start")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
hb_task.abort();
|
||||||
|
writer_task.abort();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_incoming_stream(
|
||||||
|
cfg: ClientConfig,
|
||||||
|
incoming: common::protocol::IncomingTcp,
|
||||||
|
out_tx: mpsc::Sender<ClientFrame>,
|
||||||
|
sinks: StreamSinkMap,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut local = TcpStream::connect(&cfg.local_addr)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("connect local mc {}", cfg.local_addr))?;
|
||||||
|
|
||||||
|
if !incoming.initial_data.is_empty() {
|
||||||
|
local.write_all(&incoming.initial_data).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (local_read, local_write) = local.into_split();
|
||||||
|
let (to_local_tx, to_local_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||||
|
sinks.write().await.insert(incoming.stream_id.clone(), to_local_tx);
|
||||||
|
|
||||||
|
let stream_id = incoming.stream_id.clone();
|
||||||
|
let sinks_clone = sinks.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = run_local_writer(local_write, to_local_rx).await {
|
||||||
|
warn!(stream_id = %stream_id, error = %e, "local writer ended");
|
||||||
|
}
|
||||||
|
sinks_clone.write().await.remove(&stream_id);
|
||||||
|
});
|
||||||
|
|
||||||
|
let stream_id = incoming.stream_id.clone();
|
||||||
|
let out_tx_clone = out_tx.clone();
|
||||||
|
let sinks_clone = sinks.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = run_local_reader(local_read, out_tx_clone.clone(), stream_id.clone()).await {
|
||||||
|
warn!(stream_id = %stream_id, error = %e, "local reader ended");
|
||||||
|
}
|
||||||
|
let _ = out_tx_clone
|
||||||
|
.send(ClientFrame::StreamClosed(StreamClosed {
|
||||||
|
stream_id: stream_id.clone(),
|
||||||
|
reason: Some("local_reader_closed".into()),
|
||||||
|
}))
|
||||||
|
.await;
|
||||||
|
sinks_clone.write().await.remove(&stream_id);
|
||||||
|
});
|
||||||
|
|
||||||
|
info!(
|
||||||
|
stream_id = %incoming.stream_id,
|
||||||
|
peer = %incoming.peer_addr,
|
||||||
|
hostname = %incoming.hostname,
|
||||||
|
local = %cfg.local_addr,
|
||||||
|
"connected relay stream to local minecraft"
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_local_reader(
|
||||||
|
mut reader: tokio::net::tcp::OwnedReadHalf,
|
||||||
|
out_tx: mpsc::Sender<ClientFrame>,
|
||||||
|
stream_id: String,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut buf = vec![0u8; 16 * 1024];
|
||||||
|
loop {
|
||||||
|
let n = reader.read(&mut buf).await?;
|
||||||
|
if n == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
out_tx
|
||||||
|
.send(ClientFrame::StreamData(StreamData {
|
||||||
|
stream_id: stream_id.clone(),
|
||||||
|
data: buf[..n].to_vec(),
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.context("send local data to relay")?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_local_writer(
|
||||||
|
mut writer: tokio::net::tcp::OwnedWriteHalf,
|
||||||
|
mut rx: mpsc::Receiver<Vec<u8>>,
|
||||||
|
) -> Result<()> {
|
||||||
|
while let Some(chunk) = rx.recv().await {
|
||||||
|
writer.write_all(&chunk).await?;
|
||||||
|
}
|
||||||
|
writer.shutdown().await.ok();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
11
common/Cargo.toml
Normal file
11
common/Cargo.toml
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
[package]
|
||||||
|
name = "common"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
serde.workspace = true
|
||||||
|
serde_json.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
|
thiserror.workspace = true
|
||||||
|
uuid.workspace = true
|
||||||
42
common/src/codec.rs
Normal file
42
common/src/codec.rs
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
use std::io;
|
||||||
|
|
||||||
|
use serde::{Serialize, de::DeserializeOwned};
|
||||||
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
|
|
||||||
|
const MAX_FRAME_SIZE: usize = 1024 * 1024;
|
||||||
|
|
||||||
|
pub async fn write_frame<W, T>(writer: &mut W, value: &T) -> io::Result<()>
|
||||||
|
where
|
||||||
|
W: AsyncWrite + Unpin,
|
||||||
|
T: Serialize,
|
||||||
|
{
|
||||||
|
let payload = serde_json::to_vec(value)
|
||||||
|
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
|
||||||
|
if payload.len() > MAX_FRAME_SIZE {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidInput,
|
||||||
|
"frame exceeds max size",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
writer.write_u32(payload.len() as u32).await?;
|
||||||
|
writer.write_all(&payload).await?;
|
||||||
|
writer.flush().await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read_frame<R, T>(reader: &mut R) -> io::Result<T>
|
||||||
|
where
|
||||||
|
R: AsyncRead + Unpin,
|
||||||
|
T: DeserializeOwned,
|
||||||
|
{
|
||||||
|
let len = reader.read_u32().await? as usize;
|
||||||
|
if len > MAX_FRAME_SIZE {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidData,
|
||||||
|
"frame too large",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let mut payload = vec![0u8; len];
|
||||||
|
reader.read_exact(&mut payload).await?;
|
||||||
|
serde_json::from_slice(&payload)
|
||||||
|
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
|
||||||
|
}
|
||||||
3
common/src/lib.rs
Normal file
3
common/src/lib.rs
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
pub mod codec;
|
||||||
|
pub mod minecraft;
|
||||||
|
pub mod protocol;
|
||||||
106
common/src/minecraft.rs
Normal file
106
common/src/minecraft.rs
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
use std::io;
|
||||||
|
|
||||||
|
use tokio::io::{AsyncRead, AsyncReadExt};
|
||||||
|
|
||||||
|
pub async fn read_handshake_hostname<R>(reader: &mut R) -> io::Result<String>
|
||||||
|
where
|
||||||
|
R: AsyncRead + Unpin,
|
||||||
|
{
|
||||||
|
let (hostname, _) = read_handshake_hostname_and_bytes(reader).await?;
|
||||||
|
Ok(hostname)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read_handshake_hostname_and_bytes<R>(reader: &mut R) -> io::Result<(String, Vec<u8>)>
|
||||||
|
where
|
||||||
|
R: AsyncRead + Unpin,
|
||||||
|
{
|
||||||
|
let (packet_len, packet_len_bytes) = read_varint_async_with_bytes(reader).await?;
|
||||||
|
let packet_len = packet_len as usize;
|
||||||
|
if packet_len == 0 || packet_len > 2048 {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidData,
|
||||||
|
"invalid minecraft handshake packet length",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut buf = vec![0u8; packet_len];
|
||||||
|
reader.read_exact(&mut buf).await?;
|
||||||
|
let mut cur = &buf[..];
|
||||||
|
|
||||||
|
let packet_id = read_varint_slice(&mut cur)?;
|
||||||
|
if packet_id != 0 {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidData,
|
||||||
|
"expected handshake packet id 0",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let _protocol_version = read_varint_slice(&mut cur)?;
|
||||||
|
let host = read_string_slice(&mut cur, 255)?;
|
||||||
|
if cur.len() < 2 {
|
||||||
|
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "missing port"));
|
||||||
|
}
|
||||||
|
let _port = u16::from_be_bytes([cur[0], cur[1]]);
|
||||||
|
cur = &cur[2..];
|
||||||
|
let _next_state = read_varint_slice(&mut cur)?;
|
||||||
|
|
||||||
|
let mut raw = packet_len_bytes;
|
||||||
|
raw.extend_from_slice(&buf);
|
||||||
|
Ok((host, raw))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_varint_async_with_bytes<R>(reader: &mut R) -> io::Result<(i32, Vec<u8>)>
|
||||||
|
where
|
||||||
|
R: AsyncRead + Unpin,
|
||||||
|
{
|
||||||
|
let mut num_read = 0;
|
||||||
|
let mut result = 0i32;
|
||||||
|
let mut raw = Vec::with_capacity(5);
|
||||||
|
loop {
|
||||||
|
let byte = reader.read_u8().await?;
|
||||||
|
raw.push(byte);
|
||||||
|
let value = (byte & 0x7F) as i32;
|
||||||
|
result |= value << (7 * num_read);
|
||||||
|
num_read += 1;
|
||||||
|
if num_read > 5 {
|
||||||
|
return Err(io::Error::new(io::ErrorKind::InvalidData, "varint too big"));
|
||||||
|
}
|
||||||
|
if (byte & 0x80) == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok((result, raw))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_varint_slice(input: &mut &[u8]) -> io::Result<i32> {
|
||||||
|
let mut num_read = 0;
|
||||||
|
let mut result = 0i32;
|
||||||
|
loop {
|
||||||
|
if input.is_empty() {
|
||||||
|
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "eof"));
|
||||||
|
}
|
||||||
|
let byte = input[0];
|
||||||
|
*input = &input[1..];
|
||||||
|
let value = (byte & 0x7F) as i32;
|
||||||
|
result |= value << (7 * num_read);
|
||||||
|
num_read += 1;
|
||||||
|
if num_read > 5 {
|
||||||
|
return Err(io::Error::new(io::ErrorKind::InvalidData, "varint too big"));
|
||||||
|
}
|
||||||
|
if (byte & 0x80) == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_string_slice(input: &mut &[u8], max_len: usize) -> io::Result<String> {
|
||||||
|
let len = read_varint_slice(input)? as usize;
|
||||||
|
if len > max_len || input.len() < len {
|
||||||
|
return Err(io::Error::new(io::ErrorKind::InvalidData, "bad string len"));
|
||||||
|
}
|
||||||
|
let bytes = &input[..len];
|
||||||
|
*input = &input[len..];
|
||||||
|
String::from_utf8(bytes.to_vec())
|
||||||
|
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
|
||||||
|
}
|
||||||
105
common/src/protocol.rs
Normal file
105
common/src/protocol.rs
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct RegisterRequest {
|
||||||
|
pub token: String,
|
||||||
|
pub region: String,
|
||||||
|
pub requested_subdomain: Option<String>,
|
||||||
|
pub local_addr: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct RegisterAccepted {
|
||||||
|
pub session_id: String,
|
||||||
|
pub fqdn: String,
|
||||||
|
pub heartbeat_interval_secs: u64,
|
||||||
|
pub owner_instance_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Heartbeat {
|
||||||
|
pub session_id: String,
|
||||||
|
pub active_streams: u32,
|
||||||
|
pub bytes_in: u64,
|
||||||
|
pub bytes_out: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct IncomingTcp {
|
||||||
|
pub stream_id: String,
|
||||||
|
pub session_id: String,
|
||||||
|
pub peer_addr: String,
|
||||||
|
pub hostname: String,
|
||||||
|
pub initial_data: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct StreamData {
|
||||||
|
pub stream_id: String,
|
||||||
|
pub data: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct StreamClosed {
|
||||||
|
pub stream_id: String,
|
||||||
|
pub reason: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct RelayForwardPrelude {
|
||||||
|
pub version: u8,
|
||||||
|
pub session_id: String,
|
||||||
|
pub fqdn: String,
|
||||||
|
pub stream_id: String,
|
||||||
|
pub peer_addr: String,
|
||||||
|
pub origin_instance_id: String,
|
||||||
|
pub hop_count: u8,
|
||||||
|
pub initial_data: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct R2rStreamData {
|
||||||
|
pub session_id: String,
|
||||||
|
pub stream_id: String,
|
||||||
|
pub data: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct R2rStreamClosed {
|
||||||
|
pub session_id: String,
|
||||||
|
pub stream_id: String,
|
||||||
|
pub reason: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "type", content = "data")]
|
||||||
|
pub enum R2rFrame {
|
||||||
|
Open(RelayForwardPrelude),
|
||||||
|
Data(R2rStreamData),
|
||||||
|
Close(R2rStreamClosed),
|
||||||
|
Ping,
|
||||||
|
Pong,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "type", content = "data")]
|
||||||
|
pub enum ClientFrame {
|
||||||
|
Register(RegisterRequest),
|
||||||
|
Heartbeat(Heartbeat),
|
||||||
|
StreamData(StreamData),
|
||||||
|
StreamClosed(StreamClosed),
|
||||||
|
Pong,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "type", content = "data")]
|
||||||
|
pub enum ServerFrame {
|
||||||
|
RegisterAccepted(RegisterAccepted),
|
||||||
|
RegisterRejected { reason: String },
|
||||||
|
IncomingTcp(IncomingTcp),
|
||||||
|
StreamData(StreamData),
|
||||||
|
StreamClosed(StreamClosed),
|
||||||
|
Ping,
|
||||||
|
DrainNotice { retry_after_ms: u64, reason: String },
|
||||||
|
Error { message: String },
|
||||||
|
}
|
||||||
70
db/schema.sql
Normal file
70
db/schema.sql
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
-- PostgreSQL schema scaffold for dvv tunnel SaaS platform.
|
||||||
|
|
||||||
|
create extension if not exists pgcrypto;
|
||||||
|
|
||||||
|
create table if not exists users (
|
||||||
|
id uuid primary key default gen_random_uuid(),
|
||||||
|
email text unique not null,
|
||||||
|
password_hash text,
|
||||||
|
created_at timestamptz not null default now(),
|
||||||
|
updated_at timestamptz not null default now()
|
||||||
|
);
|
||||||
|
|
||||||
|
create table if not exists plans (
|
||||||
|
id text primary key,
|
||||||
|
name text not null,
|
||||||
|
max_tunnels integer not null,
|
||||||
|
custom_subdomain boolean not null default false,
|
||||||
|
idle_timeout_seconds integer,
|
||||||
|
bandwidth_cap_kbps integer,
|
||||||
|
priority_routing boolean not null default false,
|
||||||
|
created_at timestamptz not null default now()
|
||||||
|
);
|
||||||
|
|
||||||
|
create table if not exists subscriptions (
|
||||||
|
id uuid primary key default gen_random_uuid(),
|
||||||
|
user_id uuid not null references users(id) on delete cascade,
|
||||||
|
plan_id text not null references plans(id),
|
||||||
|
provider text not null default 'stripe',
|
||||||
|
provider_customer_id text,
|
||||||
|
provider_subscription_id text,
|
||||||
|
status text not null,
|
||||||
|
current_period_end timestamptz,
|
||||||
|
created_at timestamptz not null default now(),
|
||||||
|
updated_at timestamptz not null default now()
|
||||||
|
);
|
||||||
|
|
||||||
|
create index if not exists subscriptions_user_id_idx on subscriptions(user_id);
|
||||||
|
create unique index if not exists subscriptions_provider_subscription_id_uidx
|
||||||
|
on subscriptions(provider_subscription_id)
|
||||||
|
where provider_subscription_id is not null;
|
||||||
|
|
||||||
|
create table if not exists tunnels (
|
||||||
|
id uuid primary key default gen_random_uuid(),
|
||||||
|
user_id uuid not null references users(id) on delete cascade,
|
||||||
|
region text not null check (region in ('eu', 'us', 'asia')),
|
||||||
|
subdomain text not null,
|
||||||
|
custom_domain boolean not null default false,
|
||||||
|
active boolean not null default true,
|
||||||
|
created_at timestamptz not null default now(),
|
||||||
|
updated_at timestamptz not null default now(),
|
||||||
|
unique (region, subdomain)
|
||||||
|
);
|
||||||
|
|
||||||
|
create index if not exists tunnels_user_id_idx on tunnels(user_id);
|
||||||
|
|
||||||
|
create table if not exists usage_rollups_hourly (
|
||||||
|
user_id uuid not null references users(id) on delete cascade,
|
||||||
|
hour_bucket timestamptz not null,
|
||||||
|
bytes_in bigint not null default 0,
|
||||||
|
bytes_out bigint not null default 0,
|
||||||
|
player_connections integer not null default 0,
|
||||||
|
primary key (user_id, hour_bucket)
|
||||||
|
);
|
||||||
|
|
||||||
|
insert into plans (id, name, max_tunnels, custom_subdomain, idle_timeout_seconds, bandwidth_cap_kbps, priority_routing)
|
||||||
|
values
|
||||||
|
('free', 'Free', 1, false, 900, 512, false),
|
||||||
|
('pro', 'Pro', 5, true, null, 8192, true),
|
||||||
|
('team', 'Team', 25, true, null, null, true)
|
||||||
|
on conflict (id) do nothing;
|
||||||
32
docker-compose.yml
Normal file
32
docker-compose.yml
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
version: "3.9"
|
||||||
|
|
||||||
|
services:
|
||||||
|
redis:
|
||||||
|
image: redis:7-alpine
|
||||||
|
ports:
|
||||||
|
- "6379:6379"
|
||||||
|
|
||||||
|
postgres:
|
||||||
|
image: postgres:16-alpine
|
||||||
|
environment:
|
||||||
|
POSTGRES_USER: dvv
|
||||||
|
POSTGRES_PASSWORD: dvv
|
||||||
|
POSTGRES_DB: dvv
|
||||||
|
ports:
|
||||||
|
- "5432:5432"
|
||||||
|
volumes:
|
||||||
|
- ./db/schema.sql:/docker-entrypoint-initdb.d/001-schema.sql:ro
|
||||||
|
|
||||||
|
auth-api:
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: Dockerfile.auth-api
|
||||||
|
environment:
|
||||||
|
AUTH_BIND: 0.0.0.0:8080
|
||||||
|
JWT_SECRET: dev-secret-change-me
|
||||||
|
REDIS_URL: redis://redis:6379
|
||||||
|
depends_on:
|
||||||
|
- redis
|
||||||
|
ports:
|
||||||
|
- "8080:8080"
|
||||||
|
|
||||||
19
relay/Cargo.toml
Normal file
19
relay/Cargo.toml
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
[package]
|
||||||
|
name = "relay"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow.workspace = true
|
||||||
|
tokio.workspace = true
|
||||||
|
tracing.workspace = true
|
||||||
|
tracing-subscriber.workspace = true
|
||||||
|
uuid.workspace = true
|
||||||
|
fastrand.workspace = true
|
||||||
|
redis.workspace = true
|
||||||
|
serde_json.workspace = true
|
||||||
|
chrono.workspace = true
|
||||||
|
serde.workspace = true
|
||||||
|
metrics.workspace = true
|
||||||
|
metrics-exporter-prometheus.workspace = true
|
||||||
|
common = { path = "../common" }
|
||||||
1473
relay/src/main.rs
Normal file
1473
relay/src/main.rs
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user