diff --git a/README.md b/README.md index 873401a..5fba750 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ A fast and stable reverse proxy for NAT traversal, written in Rust -rathole, like frp, can help to expose the service on the device behind the NAT to the Internet, via a server with a public IP. +rathole, like [frp](https://github.com/fatedier/frp), can help to expose the service on the device behind the NAT to the Internet, via a server with a public IP. ## Quickstart @@ -62,7 +62,7 @@ remote_addr = "example.com:2333" # Necessary. The address of the server default_token = "default_token_if_not_specify" # Optional. The default token of services, if they don't define their own ones [client.transport] -type = "tcp" # Necessary if multiple transport blocks present. Possibile values: ["tcp", "tls"]. Default: "tcp" +type = "tcp" # Optional. Possibile values: ["tcp", "tls"]. Default: "tcp" [client.transport.tls] # Necessary if `type` is "tls" trusted_root = "ca.pem" # Necessary. The certificate of CA that signed the server's certificate hostname = "example.com" # Optional. The hostname that the client uses to validate the certificate. If not set, fallback to `client.remote_addr` @@ -80,7 +80,7 @@ default_token = "default_token_if_not_specify" # Optional [server.transport] type = "tcp" # Same as `[client.transport]` -[server.transport.tls] +[server.transport.tls] # Necessary if `type` is "tls" pkcs12 = "identify.pfx" # Necessary. pkcs12 file of server's certificate and private key pkcs12_password = "password" # Necessary. Password of the pkcs12 file @@ -92,9 +92,9 @@ bind_addr = "0.0.0.0:8081" # Necessary. The address of the service is exposed at bind_addr = "0.0.0.1:8082" ``` -# Benchmark +## Benchmark -rathole has similiar latency to frp, but can handle more connections. Also it can provide much better bandwidth than frp. +rathole has similiar latency to [frp](https://github.com/fatedier/frp), but can handle more connections. Also it can provide much better bandwidth than frp. See also [Benchmark](./doc/benchmark.md). @@ -102,10 +102,10 @@ See also [Benchmark](./doc/benchmark.md). ![tcp_latency](./doc/img/tcp_latency.svg) -# Development +## Development Status `rathole` is in active development. A load of features is on the way: - +- [x] TLS support - [ ] UDP support - [ ] Hot reloading - [ ] HTTP APIs for configuration diff --git a/src/client.rs b/src/client.rs index 25e6f1c..968a37f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -17,6 +17,7 @@ use tokio::sync::oneshot; use tokio::time::{self, Duration}; use tracing::{debug, error, info, instrument, Instrument, Span}; +// The entrypoint of running a client pub async fn run_client(config: &Config) -> Result<()> { let config = match &config.client { Some(v) => v, @@ -40,6 +41,7 @@ pub async fn run_client(config: &Config) -> Result<()> { type ServiceDigest = protocol::Digest; type Nonce = protocol::Digest; +// Holds the state of a client struct Client<'a, T: Transport> { config: &'a ClientConfig, service_handles: HashMap, @@ -47,6 +49,7 @@ struct Client<'a, T: Transport> { } impl<'a, T: 'static + Transport> Client<'a, T> { + // Create a Client from `[client]` config block async fn from(config: &'a ClientConfig) -> Result> { Ok(Client { config, @@ -55,8 +58,10 @@ impl<'a, T: 'static + Transport> Client<'a, T> { }) } + // The entrypoint of Client async fn run(&mut self) -> Result<()> { for (name, config) in &self.config.services { + // Create a control channel for each service defined let handle = ControlChannelHandle::new( (*config).clone(), self.config.remote_addr.clone(), @@ -65,6 +70,8 @@ impl<'a, T: 'static + Transport> Client<'a, T> { self.service_handles.insert(name.clone(), handle); } + // TODO: Maybe wait for a config change signal for hot reloading + // Wait for the shutdown signal loop { tokio::select! { val = tokio::signal::ctrl_c() => { @@ -130,14 +137,17 @@ async fn run_data_channel(args: Arc>) -> Res Ok(()) } +// Control channel, using T as the transport layer struct ControlChannel { - digest: ServiceDigest, - service: ClientServiceConfig, - shutdown_rx: oneshot::Receiver, - remote_addr: String, - transport: Arc, + digest: ServiceDigest, // SHA256 of the service name + service: ClientServiceConfig, // `[client.services.foo]` config block + shutdown_rx: oneshot::Receiver, // Receives the shutdown signal + remote_addr: String, // `client.remote_addr` + transport: Arc, // Wrapper around the transport layer } +// Handle of a control channel +// Dropping it will also drop the actual control channel struct ControlChannelHandle { shutdown_tx: oneshot::Sender, } diff --git a/src/multi_map.rs b/src/multi_map.rs index 81af822..3eca6f8 100644 --- a/src/multi_map.rs +++ b/src/multi_map.rs @@ -6,6 +6,9 @@ struct RawItem(*mut (K1, K2, V)); unsafe impl Send for RawItem {} unsafe impl Sync for RawItem {} +/// MultiMap is a hash map that can index an item by two keys +/// For example, after an item with key (a, b) is insert, `map.get1(a)` and +/// `map.get2(b)` both returns the item. Likewise the `remove1` and `remove2`. pub struct MultiMap { map1: HashMap, RawItem>, map2: HashMap, RawItem>, diff --git a/src/server.rs b/src/server.rs index d2e7c9c..353b013 100644 --- a/src/server.rs +++ b/src/server.rs @@ -19,12 +19,13 @@ use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::time; use tracing::{debug, error, info, info_span, warn, Instrument}; -type ServiceDigest = protocol::Digest; -type Nonce = protocol::Digest; +type ServiceDigest = protocol::Digest; // SHA256 of a service name +type Nonce = protocol::Digest; // Also called `session_key` -const POOL_SIZE: usize = 64; -const CHAN_SIZE: usize = 2048; +const POOL_SIZE: usize = 64; // The number of cached connections +const CHAN_SIZE: usize = 2048; // The capacity of various chans +// The entrypoint of running a server pub async fn run_server(config: &Config) -> Result<()> { let config = match &config.server { Some(config) => config, @@ -32,6 +33,8 @@ pub async fn run_server(config: &Config) -> Result<()> { return Err(anyhow!("Try to run as a server, but the configuration is missing. Please add the `[server]` block")) } }; + + //TODO: Maybe use a Box here to reduce duplicated code match config.transport.transport_type { TransportType::Tcp => { let mut server = Server::::from(config).await?; @@ -42,17 +45,30 @@ pub async fn run_server(config: &Config) -> Result<()> { server.run().await?; } } + Ok(()) } +// A hash map of ControlChannelHandles, indexed by ServiceDigest or Nonce +// See also MultiMap type ControlChannelMap = MultiMap>; + +// Server holds all states of running a server struct Server<'a, T: Transport> { + // `[server]` config config: &'a ServerConfig, + + // TODO: Maybe the rwlock is unnecessary. + // Keep it until the hot reloading feature is implemented + // `[server.services]` config, indexed by ServiceDigest services: Arc>>, + // Collection of contorl channels control_channels: Arc>>, + // Wrapper around the transport layer transport: Arc, } +// Generate a hash map of services which is indexed by ServiceDigest fn generate_service_hashmap( server_config: &ServerConfig, ) -> HashMap { @@ -64,6 +80,7 @@ fn generate_service_hashmap( } impl<'a, T: 'static + Transport> Server<'a, T> { + // Create a server from `[server]` pub async fn from(config: &'a ServerConfig) -> Result> { Ok(Server { config, @@ -73,7 +90,9 @@ impl<'a, T: 'static + Transport> Server<'a, T> { }) } + // The entry point of Server pub async fn run(&mut self) -> Result<()> { + // Listen at `server.bind_addr` let l = self .transport .bind(&self.config.bind_addr) @@ -88,14 +107,17 @@ impl<'a, T: 'static + Transport> Server<'a, T> { ..Default::default() }; - // Listen for incoming control or data channels + // Wait for connections and shutdown signals loop { tokio::select! { + // Wait for incoming control and data channels ret = self.transport.accept(&l) => { match ret { Err(err) => { + // Detects whether it's an IO error if let Some(err) = err.downcast_ref::() { - // Possibly a EMFILE. So sleep for a while and retry + // If it is an IO error, then it's possibly an + // EMFILE. So sleep for a while and retry if let Some(d) = backoff.next_backoff() { error!("Failed to accept: {}. Retry in {:?}...", err, d); time::sleep(d).await; @@ -105,6 +127,8 @@ impl<'a, T: 'static + Transport> Server<'a, T> { break; } } + // If it's not an IO error, then it comes from + // the transport layer, so just ignore it } Ok((conn, addr)) => { backoff.reset(); @@ -120,6 +144,7 @@ impl<'a, T: 'static + Transport> Server<'a, T> { } } }, + // Wait for the shutdown signal _ = tokio::signal::ctrl_c() => { info!("Shuting down gracefully..."); break; @@ -131,6 +156,7 @@ impl<'a, T: 'static + Transport> Server<'a, T> { } } +// Handle connections to `server.bind_addr` async fn handle_connection( mut conn: T::Stream, addr: SocketAddr, @@ -203,8 +229,16 @@ async fn do_control_channel_handshake( ); bail!("Service {} failed the authentication", service_name); } else { + // TODO: Here could use some refactor: + // 1. Clone the config and drop `services_guard` earlier + // 2. Use the result of `insert` to warn. Then no need to call `remove1` + let mut h = control_channels.write().await; + // If there's already a control channel for the service, then drop the old one. + // Because a control channel doesn't report back when it's dead, + // the handle in the map could be stall, dropping the old handle enables + // the client to reconnect. if let Some(_) = h.remove1(&service_digest) { warn!( "Dropping previous control channel for digest {}", @@ -213,6 +247,8 @@ async fn do_control_channel_handshake( } let service_config = service_config.clone(); + + // Drop the rwlock as soon as possible when we're done with it drop(services_guard); // Send ack @@ -222,7 +258,7 @@ async fn do_control_channel_handshake( info!(service = %service_config.name, "Control channel established"); let handle = ControlChannelHandle::new(conn, service_config); - // Drop the old handle + // Insert the new handle let _ = h.insert(service_digest, session_key, handle); } @@ -242,38 +278,55 @@ async fn do_data_channel_handshake( c_ch.conn_pool.data_ch_tx.send(conn).await?; } None => { + // TODO: Maybe print IP here warn!("Data channel has incorrect nonce"); } } Ok(()) } +// Control channel, using T as the transport layer struct ControlChannel { - conn: T::Stream, - service: ServerServiceConfig, - shutdown_rx: oneshot::Receiver, - visitor_tx: mpsc::Sender, + conn: T::Stream, // The connection of control channel + service: ServerServiceConfig, // A copy of the corresponding service config + shutdown_rx: oneshot::Receiver, // Receives the shutdown signal + visitor_tx: mpsc::Sender, // Receives visitor connections } +// The handle of a control channel, along with the handle of a connection pool +// Dropping it will drop the actual control channel, because `visitor_tx` +// and `shutdown_tx` are closed struct ControlChannelHandle { + // Shutdown the control channel. + // Not used for now, but can be used for hot reloading _shutdown_tx: oneshot::Sender, conn_pool: ConnectionPoolHandle, } impl ControlChannelHandle { + // Create a control channel handle, where the control channel handling task + // and the connection pool task are created. fn new(conn: T::Stream, service: ServerServiceConfig) -> ControlChannelHandle { - let (_shutdown_tx, shutdown_rx) = oneshot::channel::(); + // Save the name string for logging let name = service.name.clone(); + + // Create a shutdown channel. The sender is not used for now, but for future use + let (_shutdown_tx, shutdown_rx) = oneshot::channel::(); + + // Create and run the connection pool, where the visitors and data channels meet let conn_pool = ConnectionPoolHandle::new(); - let actor: ControlChannel = ControlChannel { + + // Create the control channel + let ch: ControlChannel = ControlChannel { conn, shutdown_rx, service, visitor_tx: conn_pool.visitor_tx.clone(), }; + // Run the control channel tokio::spawn(async move { - if let Err(err) = actor.run().await { + if let Err(err) = ch.run().await { error!(%name, "{}", err); } }); @@ -286,8 +339,10 @@ impl ControlChannelHandle { } impl ControlChannel { + // Run a control channel #[tracing::instrument(skip(self), fields(service = %self.service.name))] async fn run(mut self) -> Result<()> { + // Where the service is exposed let l = match TcpListener::bind(&self.service.bind_addr).await { Ok(v) => v, Err(e) => { @@ -303,7 +358,11 @@ impl ControlChannel { info!("Listening at {}", &self.service.bind_addr); + // Each `u8` in the chan indicates a data channel creation request let (data_req_tx, mut data_req_rx) = mpsc::unbounded_channel::(); + + // The control channel is moved into the task, and sends CreateDataChannel + // comamnds to the client when needed tokio::spawn(async move { let cmd = bincode::serialize(&ControlChannelCmd::CreateDataChannel).unwrap(); while data_req_rx.recv().await.is_some() { @@ -313,32 +372,43 @@ impl ControlChannel { } }); + // Cache some data channels for later use for _i in 0..POOL_SIZE { if let Err(e) = data_req_tx.send(0) { error!("Failed to request data channel {}", e); }; } + // Retry at least every 1s let mut backoff = ExponentialBackoff { max_interval: Duration::from_secs(1), max_elapsed_time: None, ..Default::default() }; + + // Wait for visitors and the shutdown signal loop { tokio::select! { + // Wait for visitors val = l.accept() => { match val { Err(e) => { + // `l` is a TCP listener so this must be a IO error + // Possibly a EMFILE. So sleep for a while error!("{}. Sleep for a while", e); if let Some(d) = backoff.next_backoff() { time::sleep(d).await; } else { + // This branch will never be reached for current backoff policy error!("Too many retries. Aborting..."); break; } }, Ok((incoming, addr)) => { + // For every visitor, request to create a data channel if let Err(e) = data_req_tx.send(0) { + // An error indicates the control channel is broken + // So break the loop error!("{}", e); break; }; @@ -347,10 +417,12 @@ impl ControlChannel { debug!("New visitor from {}", addr); + // Send the visitor to the connection pool let _ = self.visitor_tx.send(incoming).await; } } }, + // Wait for the shutdown signal _ = &mut self.shutdown_rx => { break; } diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 468ea25..777ef9f 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -6,6 +6,7 @@ use std::net::SocketAddr; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::ToSocketAddrs; +// Specify a transport layer, like TCP, TLS #[async_trait] pub trait Transport: Debug + Send + Sync { type Acceptor: Send + Sync;