feat: configurable retry interval (#208)

This commit is contained in:
Yujia Qiao 2022-11-29 17:41:23 +08:00 committed by GitHub
parent 87d06c91b9
commit d216d6380f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 46 additions and 20 deletions

View File

@ -16,12 +16,13 @@ rathole类似于 [frp](https://github.com/fatedier/frp) 和 [ngrok](https://g
<!-- TOC -->
- [Features](#features)
- [Quickstart](#quickstart)
- [Configuration](#configuration)
- [Logging](#logging)
- [Benchmark](#benchmark)
- [Development Status](#development-status)
- [rathole](#rathole)
- [Features](#features)
- [Quickstart](#quickstart)
- [Configuration](#configuration)
- [Logging](#logging)
- [Benchmark](#benchmark)
- [Development Status](#development-status)
<!-- /TOC -->
@ -104,6 +105,7 @@ local_addr = "127.0.0.1:22" # 需要被转发的服务的地址
remote_addr = "example.com:2333" # Necessary. The address of the server
default_token = "default_token_if_not_specify" # Optional. The default token of services, if they don't define their own ones
heartbeat_timeout = 40 # Optional. Set to 0 to disable the application-layer heartbeat test. The value must be greater than `server.heartbeat_interval`. Default: 40 seconds
retry_interval = 1 # Optional. The interval between retry to connect to the server. Default: 1 second
[client.transport] # The whole block is optional. Specify which transport to use
type = "tcp" # Optional. Possible values: ["tcp", "tls", "noise"]. Default: "tcp"
@ -159,6 +161,7 @@ type = "tcp" # Optional. Same as the client `[client.services.X.type]
token = "whatever" # Necessary if `server.default_token` not set
bind_addr = "0.0.0.0:8081" # Necessary. The address of the service is exposed at. Generally only the port needs to be change.
nodelay = false # Optional. Same as the client
retry_interval = 1 # Optional. The interval between retry to connect to the server. Default: inherits the global config
[server.services.service2]
bind_addr = "0.0.0.1:8082"

View File

@ -17,12 +17,13 @@ rathole, like [frp](https://github.com/fatedier/frp) and [ngrok](https://github.
<!-- TOC -->
- [Features](#features)
- [Quickstart](#quickstart)
- [Configuration](#configuration)
- [Logging](#logging)
- [Benchmark](#benchmark)
- [Development Status](#development-status)
- [rathole](#rathole)
- [Features](#features)
- [Quickstart](#quickstart)
- [Configuration](#configuration)
- [Logging](#logging)
- [Benchmark](#benchmark)
- [Development Status](#development-status)
<!-- /TOC -->
@ -106,6 +107,7 @@ Here is the full configuration specification:
remote_addr = "example.com:2333" # Necessary. The address of the server
default_token = "default_token_if_not_specify" # Optional. The default token of services, if they don't define their own ones
heartbeat_timeout = 40 # Optional. Set to 0 to disable the application-layer heartbeat test. The value must be greater than `server.heartbeat_interval`. Default: 40 seconds
retry_interval = 1 # Optional. The interval between retry to connect to the server. Default: 1 second
[client.transport] # The whole block is optional. Specify which transport to use
type = "tcp" # Optional. Possible values: ["tcp", "tls", "noise"]. Default: "tcp"
@ -130,6 +132,7 @@ type = "tcp" # Optional. The protocol that needs forwarding. Possible values: ["
token = "whatever" # Necessary if `client.default_token` not set
local_addr = "127.0.0.1:1081" # Necessary. The address of the service that needs to be forwarded
nodelay = false # Optional. Determine whether to enable TCP_NODELAY for data transmission, if applicable, to improve the latency but decrease the bandwidth. Default: false
retry_interval = 1 # Optional. The interval between retry to connect to the server. Default: inherits the global config
[client.services.service2] # Multiple services can be defined
local_addr = "127.0.0.1:1082"

View File

@ -494,6 +494,9 @@ impl ControlChannelHandle {
info!("Starting {}", hex::encode(digest));
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let mut retry_backoff = run_control_chan_backoff(service.retry_interval.unwrap());
let mut s = ControlChannel {
digest,
service,
@ -505,7 +508,6 @@ impl ControlChannelHandle {
tokio::spawn(
async move {
let mut backoff = run_control_chan_backoff();
let mut start = Instant::now();
while let Err(err) = s
@ -519,10 +521,10 @@ impl ControlChannelHandle {
if start.elapsed() > Duration::from_secs(3) {
// The client runs for at least 3 secs and then disconnects
// Retry immediately
backoff.reset();
error!("{:#}. Retry...", err);
} else if let Some(duration) = backoff.next_backoff() {
retry_backoff.reset();
}
if let Some(duration) = retry_backoff.next_backoff() {
error!("{:#}. Retry in {:?}...", err, duration);
time::sleep(duration).await;
} else {

View File

@ -13,6 +13,9 @@ use crate::transport::{DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_KEEPALIVE_SECS, DEFAU
const DEFAULT_HEARTBEAT_INTERVAL_SECS: u64 = 30;
const DEFAULT_HEARTBEAT_TIMEOUT_SECS: u64 = 40;
/// Client
const DEFAULT_CLIENT_RETRY_INTERVAL_SECS: u64 = 1;
/// String with Debug implementation that emits "MASKED"
/// Used to mask sensitive strings when logging
#[derive(Serialize, Deserialize, Default, PartialEq, Eq, Clone)]
@ -53,6 +56,8 @@ impl Default for TransportType {
}
}
/// Per service config
/// All Option are optional in configuration but must be Some value in runtime
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
#[serde(deny_unknown_fields)]
pub struct ClientServiceConfig {
@ -63,6 +68,7 @@ pub struct ClientServiceConfig {
pub local_addr: String,
pub token: Option<MaskedString>,
pub nodelay: Option<bool>,
pub retry_interval: Option<u64>,
}
impl ClientServiceConfig {
@ -92,6 +98,8 @@ fn default_service_type() -> ServiceType {
Default::default()
}
/// Per service config
/// All Option are optional in configuration but must be Some value in runtime
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
#[serde(deny_unknown_fields)]
pub struct ServerServiceConfig {
@ -185,6 +193,10 @@ fn default_heartbeat_timeout() -> u64 {
DEFAULT_HEARTBEAT_TIMEOUT_SECS
}
fn default_client_retry_interval() -> u64 {
DEFAULT_CLIENT_RETRY_INTERVAL_SECS
}
#[derive(Debug, Serialize, Deserialize, Default, PartialEq, Eq, Clone)]
#[serde(deny_unknown_fields)]
pub struct ClientConfig {
@ -195,6 +207,8 @@ pub struct ClientConfig {
pub transport: TransportConfig,
#[serde(default = "default_heartbeat_timeout")]
pub heartbeat_timeout: u64,
#[serde(default = "default_client_retry_interval")]
pub retry_interval: u64,
}
fn default_heartbeat_interval() -> u64 {
@ -266,6 +280,9 @@ impl Config {
bail!("The token of service {} is not set", name);
}
}
if s.retry_interval.is_none() {
s.retry_interval = Some(client.retry_interval);
}
}
Config::validate_transport_config(&client.transport, false)?;

View File

@ -15,11 +15,12 @@ pub fn listen_backoff() -> ExponentialBackoff {
}
}
pub fn run_control_chan_backoff() -> ExponentialBackoff {
pub fn run_control_chan_backoff(interval: u64) -> ExponentialBackoff {
ExponentialBackoff {
randomization_factor: 0.1,
randomization_factor: 0.2,
max_elapsed_time: None,
max_interval: Duration::from_secs(1),
multiplier: 3.0,
max_interval: Duration::from_secs(interval),
..Default::default()
}
}