diff --git a/Cargo.lock b/Cargo.lock index ffd27de..1aebac5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,6 +67,29 @@ version = "1.0.54" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a99269dff3bc004caa411f38845c20303f1e393ca2bd6581576fa3a7f59577d" +[[package]] +name = "async-http-proxy" +version = "1.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29faa5d4d308266048bd7505ba55484315a890102f9345b9ff4b87de64201592" +dependencies = [ + "base64", + "httparse", + "thiserror", + "tokio", +] + +[[package]] +name = "async-socks5" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77f634add2445eb2c1f785642a67ca1073fedd71e73dc3ca69435ef9b9bdedc7" +dependencies = [ + "async-trait", + "thiserror", + "tokio", +] + [[package]] name = "async-stream" version = "0.3.2" @@ -1483,6 +1506,8 @@ name = "rathole" version = "0.3.10" dependencies = [ "anyhow", + "async-http-proxy", + "async-socks5", "async-trait", "atty", "backoff", @@ -1505,6 +1530,7 @@ dependencies = [ "toml", "tracing", "tracing-subscriber 0.2.25", + "url", "vergen", ] @@ -2222,6 +2248,7 @@ dependencies = [ "idna", "matches", "percent-encoding", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0995323..0f3a12e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,9 @@ base64 = { version = "0.13", optional = true } notify = { version = "5.0.0-pre.13", optional = true } console-subscriber = { version = "0.1", optional = true, features = ["parking_lot"] } atty = "0.2" +async-http-proxy = { version = "1.2", features = ["runtime-tokio", "basic-auth"] } +async-socks5 = "0.5" +url = { version = "2.2", features = ["serde"] } [build-dependencies] vergen = { version = "6.0", default-features = false, features = ["build", "git", "cargo"] } diff --git a/README.md b/README.md index cca866e..c844984 100644 --- a/README.md +++ b/README.md @@ -108,6 +108,9 @@ default_token = "default_token_if_not_specify" # Optional. The default token of [client.transport] # The whole block is optional. Specify which transport to use type = "tcp" # Optional. Possible values: ["tcp", "tls", "noise"]. Default: "tcp" + +[client.transport.tcp] # Optional +proxy = "socks5://user:passwd@127.0.0.1:1080" # Optional. Use the proxy to connect to the server nodelay = false # Optional. Determine whether to enable TCP_NODELAY, if applicable, to improve the latency but decrease the bandwidth. Default: false keepalive_secs = 10 # Optional. Specify `tcp_keepalive_time` in `tcp(7)`, if applicable. Default: 10 seconds keepalive_interval = 5 # Optional. Specify `tcp_keepalive_intvl` in `tcp(7)`, if applicable. Default: 5 seconds diff --git a/examples/use_proxy/client.toml b/examples/use_proxy/client.toml new file mode 100644 index 0000000..f902af3 --- /dev/null +++ b/examples/use_proxy/client.toml @@ -0,0 +1,15 @@ +[client] +remote_addr = "127.0.0.1:2333" +default_token = "123" + +[client.services.foo1] +local_addr = "127.0.0.1:80" + +[client.transport] +type = "tcp" +[client.transport.tcp] +# `proxy` controls how the client connect to the server +# Use socks5 proxy at 127.0.0.1, with port 1080, username 'myuser' and password 'mypass' +proxy = "socks5://myuser:mypass@127.0.0.1:1080" +# Use http proxy. Similar to socks5 proxy +# proxy = "http://myuser:mypass@127.0.0.1:8080" diff --git a/src/config.rs b/src/config.rs index 4811769..31e98af 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,6 +5,7 @@ use std::fmt::{Debug, Formatter}; use std::ops::Deref; use std::path::Path; use tokio::fs; +use url::Url; use crate::transport::{DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_KEEPALIVE_SECS, DEFAULT_NODELAY}; @@ -20,7 +21,7 @@ impl Debug for MaskedString { } impl Deref for MaskedString { - type Target = String; + type Target = str; fn deref(&self) -> &Self::Target { &self.0 } @@ -142,36 +143,38 @@ fn default_keepalive_interval() -> u64 { DEFAULT_KEEPALIVE_INTERVAL } -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] #[serde(deny_unknown_fields)] -pub struct TransportConfig { - #[serde(rename = "type")] - pub transport_type: TransportType, +pub struct TcpConfig { #[serde(default = "default_nodelay")] pub nodelay: bool, #[serde(default = "default_keepalive_secs")] pub keepalive_secs: u64, #[serde(default = "default_keepalive_interval")] pub keepalive_interval: u64, - pub tls: Option, - pub noise: Option, + pub proxy: Option, } -impl Default for TransportConfig { - fn default() -> TransportConfig { - TransportConfig { - transport_type: Default::default(), +impl Default for TcpConfig { + fn default() -> Self { + Self { nodelay: default_nodelay(), keepalive_secs: default_keepalive_secs(), keepalive_interval: default_keepalive_interval(), - tls: None, - noise: None, + proxy: None, } } } -fn default_transport() -> TransportConfig { - Default::default() +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Default)] +#[serde(deny_unknown_fields)] +pub struct TransportConfig { + #[serde(rename = "type")] + pub transport_type: TransportType, + #[serde(default)] + pub tcp: TcpConfig, + pub tls: Option, + pub noise: Option, } #[derive(Debug, Serialize, Deserialize, Default, PartialEq, Clone)] @@ -180,7 +183,7 @@ pub struct ClientConfig { pub remote_addr: String, pub default_token: Option, pub services: HashMap, - #[serde(default = "default_transport")] + #[serde(default)] pub transport: TransportConfig, } @@ -190,7 +193,7 @@ pub struct ServerConfig { pub bind_addr: String, pub default_token: Option, pub services: HashMap, - #[serde(default = "default_transport")] + #[serde(default)] pub transport: TransportConfig, } @@ -255,6 +258,15 @@ impl Config { } fn validate_transport_config(config: &TransportConfig, is_server: bool) -> Result<()> { + config + .tcp + .proxy + .as_ref() + .map_or(Ok(()), |u| match u.scheme() { + "socks5" => Ok(()), + "http" => Ok(()), + _ => Err(anyhow!(format!("Unknown proxy scheme: {}", u.scheme()))), + })?; match config.transport_type { TransportType::Tcp => Ok(()), TransportType::Tls => { diff --git a/src/helper.rs b/src/helper.rs index e52de66..0f4a3bd 100644 --- a/src/helper.rs +++ b/src/helper.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, Result}; +use async_http_proxy::{http_connect_tokio, http_connect_tokio_with_basic_auth}; use backoff::{backoff::Backoff, Notify}; use socket2::{SockRef, TcpKeepalive}; use std::{future::Future, net::SocketAddr, time::Duration}; @@ -7,6 +8,7 @@ use tokio::{ sync::broadcast, }; use tracing::trace; +use url::Url; // Tokio hesitates to expose this option...So we have to do it on our own :( // The good news is that using socket2 it can be easily done, without losing portability. @@ -38,12 +40,21 @@ pub fn feature_not_compile(feature: &str) -> ! { ) } -/// Create a UDP socket and connect to `addr` -pub async fn udp_connect(addr: A) -> Result { - let addr = lookup_host(addr) +async fn to_socket_addr(addr: A) -> Result { + lookup_host(addr) .await? .next() - .ok_or(anyhow!("Failed to lookup the host"))?; + .ok_or(anyhow!("Failed to lookup the host")) +} + +pub fn host_port_pair(s: &str) -> Result<(&str, u16)> { + let semi = s.rfind(':').expect("missing semicolon"); + Ok((&s[..semi], s[semi + 1..].parse()?)) +} + +/// Create a UDP socket and connect to `addr` +pub async fn udp_connect(addr: A) -> Result { + let addr = to_socket_addr(addr).await?; let bind_addr = match addr { SocketAddr::V4(_) => "0.0.0.0:0", @@ -55,6 +66,52 @@ pub async fn udp_connect(addr: A) -> Result { Ok(s) } +/// Create a TcpStream using a proxy +/// e.g. socks5://user:pass@127.0.0.1:1080 http://127.0.0.1:8080 +pub async fn tcp_connect_with_proxy(addr: &str, proxy: Option<&Url>) -> Result { + if let Some(url) = proxy { + let mut s = TcpStream::connect(( + url.host_str().expect("proxy url should have host field"), + url.port().expect("proxy url should have port field"), + )) + .await?; + + let auth = if !url.username().is_empty() || url.password().is_some() { + Some(async_socks5::Auth { + username: url.username().into(), + password: url.password().unwrap_or("").into(), + }) + } else { + None + }; + match url.scheme() { + "socks5" => { + async_socks5::connect(&mut s, host_port_pair(addr)?, auth).await?; + } + "http" => { + let (host, port) = host_port_pair(addr)?; + match auth { + Some(auth) => { + http_connect_tokio_with_basic_auth( + &mut s, + host, + port, + &auth.username, + &auth.password, + ) + .await? + } + None => http_connect_tokio(&mut s, host, port).await?, + } + } + _ => panic!("unknown proxy scheme"), + } + Ok(s) + } else { + Ok(TcpStream::connect(addr).await?) + } +} + // Wrapper of retry_notify pub async fn retry_notify_with_deadline( backoff: B, diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 8660c6c..c76d7f2 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -1,4 +1,4 @@ -use crate::config::{ClientServiceConfig, ServerServiceConfig, TransportConfig}; +use crate::config::{ClientServiceConfig, ServerServiceConfig, TcpConfig, TransportConfig}; use crate::helper::try_set_tcp_keepalive; use anyhow::{Context, Result}; use async_trait::async_trait; @@ -27,6 +27,7 @@ pub trait Transport: Debug + Send + Sync { /// Provide the transport with socket options, which can be handled at the need of the transport fn hint(conn: &Self::Stream, opts: SocketOpts); async fn bind(&self, addr: T) -> Result; + /// accept must be cancel safe async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)>; async fn handshake(&self, conn: Self::RawStream) -> Result; async fn connect(&self, addr: &str) -> Result; @@ -78,7 +79,7 @@ impl SocketOpts { } impl SocketOpts { - pub fn from_transport_cfg(cfg: &TransportConfig) -> SocketOpts { + pub fn from_cfg(cfg: &TcpConfig) -> SocketOpts { SocketOpts { nodelay: Some(cfg.nodelay), keepalive: Some(Keepalive { diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs index 6895e07..f2d360a 100644 --- a/src/transport/tcp.rs +++ b/src/transport/tcp.rs @@ -1,4 +1,7 @@ -use crate::config::TransportConfig; +use crate::{ + config::{TcpConfig, TransportConfig}, + helper::tcp_connect_with_proxy, +}; use super::{SocketOpts, Transport}; use anyhow::Result; @@ -9,6 +12,7 @@ use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; #[derive(Debug)] pub struct TcpTransport { socket_opts: SocketOpts, + cfg: TcpConfig, } #[async_trait] @@ -19,7 +23,8 @@ impl Transport for TcpTransport { fn new(config: &TransportConfig) -> Result { Ok(TcpTransport { - socket_opts: SocketOpts::from_transport_cfg(config), + socket_opts: SocketOpts::from_cfg(&config.tcp), + cfg: config.tcp.clone(), }) } @@ -42,7 +47,7 @@ impl Transport for TcpTransport { } async fn connect(&self, addr: &str) -> Result { - let s = TcpStream::connect(addr).await?; + let s = tcp_connect_with_proxy(addr, self.cfg.proxy.as_ref()).await?; self.socket_opts.apply(&s); Ok(s) } diff --git a/src/transport/tls.rs b/src/transport/tls.rs index 0fead19..7204704 100644 --- a/src/transport/tls.rs +++ b/src/transport/tls.rs @@ -2,6 +2,7 @@ use std::net::SocketAddr; use super::{SocketOpts, TcpTransport, Transport}; use crate::config::{TlsConfig, TransportConfig}; +use crate::helper::host_port_pair; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use std::fs; @@ -94,8 +95,8 @@ impl Transport for TlsTransport { .connect( self.config .hostname - .as_ref() - .unwrap_or(&String::from(addr.split(':').next().unwrap())), + .as_deref() + .unwrap_or(host_port_pair(addr)?.0), conn, ) .await?)