From f92398ea310745ac33a035a04917c92563d50058 Mon Sep 17 00:00:00 2001 From: Yujia Qiao Date: Sat, 18 Dec 2021 16:23:43 +0800 Subject: [PATCH] refactor: fix clippy, merge imports Fix lints of clippy Merge imports --- .rustfmt.toml | 1 + src/client.rs | 32 +++++++++++++------------------- src/config.rs | 8 +++----- src/lib.rs | 24 ++++++++++-------------- src/main.rs | 1 - src/server.rs | 43 ++++++++++++++++++------------------------- src/transport/mod.rs | 8 +++----- src/transport/tcp.rs | 5 +++-- src/transport/tls.rs | 22 ++++++++-------------- 9 files changed, 59 insertions(+), 85 deletions(-) create mode 100644 .rustfmt.toml diff --git a/.rustfmt.toml b/.rustfmt.toml new file mode 100644 index 0000000..3226759 --- /dev/null +++ b/.rustfmt.toml @@ -0,0 +1 @@ +imports_granularity = "module" diff --git a/src/client.rs b/src/client.rs index 092e0a8..25e6f1c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,12 +2,11 @@ use std::collections::HashMap; use std::sync::Arc; use crate::config::{ClientConfig, ClientServiceConfig, Config, TransportType}; +use crate::protocol::Hello::{self, *}; use crate::protocol::{ - self, Ack, Auth, ControlChannelCmd, DataChannelCmd, - Hello::{self, *}, - CURRENT_PROTO_VRESION, HASH_WIDTH_IN_BYTES, + self, read_ack, read_control_cmd, read_data_cmd, read_hello, Ack, Auth, ControlChannelCmd, + DataChannelCmd, CURRENT_PROTO_VRESION, HASH_WIDTH_IN_BYTES, }; -use crate::protocol::{read_ack, read_control_cmd, read_data_cmd, read_hello}; use crate::transport::{TcpTransport, TlsTransport, Transport}; use anyhow::{anyhow, bail, Context, Result}; use backoff::ExponentialBackoff; @@ -28,11 +27,11 @@ pub async fn run_client(config: &Config) -> Result<()> { match config.transport.transport_type { TransportType::Tcp => { - let mut client = Client::::from(&config).await?; + let mut client = Client::::from(config).await?; client.run().await } TransportType::Tls => { - let mut client = Client::::from(&config).await?; + let mut client = Client::::from(config).await?; client.run().await } } @@ -244,19 +243,14 @@ impl ControlChannelHandle { tokio::spawn( async move { - loop { - if let Err(err) = s - .run() - .await - .with_context(|| "Failed to run the control channel") - { - let duration = Duration::from_secs(2); - error!("{:?}\n\nRetry in {:?}...", err, duration); - time::sleep(duration).await; - } else { - // Shutdown - break; - } + while let Err(err) = s + .run() + .await + .with_context(|| "Failed to run the control channel") + { + let duration = Duration::from_secs(2); + error!("{:?}\n\nRetry in {:?}...", err, duration); + time::sleep(duration).await; } } .instrument(Span::current()), diff --git a/src/config.rs b/src/config.rs index baff303..2b844be 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,9 +1,8 @@ use anyhow::{anyhow, bail, Context, Result}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::path::PathBuf; +use std::path::Path; use tokio::fs; -use toml; #[derive(Debug, Serialize, Deserialize, Copy, Clone)] pub enum TransportType { @@ -81,8 +80,7 @@ pub struct Config { impl Config { fn from_str(s: &str) -> Result { - let mut config: Config = - toml::from_str(&s).with_context(|| "Failed to parse the config")?; + let mut config: Config = toml::from_str(s).with_context(|| "Failed to parse the config")?; if let Some(server) = config.server.as_mut() { Config::validate_server_config(server)?; @@ -158,7 +156,7 @@ impl Config { } } - pub async fn from_file(path: &PathBuf) -> Result { + pub async fn from_file(path: &Path) -> Result { let s: String = fs::read_to_string(path) .await .with_context(|| format!("Failed to read the config {:?}", path))?; diff --git a/src/lib.rs b/src/lib.rs index b328c76..0f1d9a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ pub async fn run(args: &Cli) -> Result<()> { // Raise `nofile` limit on linux and mac fdlimit::raise_fd_limit(); - match determine_run_mode(&config, &args) { + match determine_run_mode(&config, args) { RunMode::Undetermine => Err(anyhow!("Cannot determine running as a server or a client")), RunMode::Client => run_client(&config).await, RunMode::Server => run_server(&config).await, @@ -44,20 +44,16 @@ fn determine_run_mode(config: &Config, args: &Cli) -> RunMode { use RunMode::*; if args.client && args.server { Undetermine + } else if args.client { + Client + } else if args.server { + Server + } else if config.client.is_some() && config.server.is_none() { + Client + } else if config.server.is_some() && config.client.is_none() { + Server } else { - if args.client { - Client - } else if args.server { - Server - } else { - if config.server.is_some() && config.client.is_none() { - Server - } else if config.client.is_some() && config.server.is_none() { - Client - } else { - Undetermine - } - } + Undetermine } } diff --git a/src/main.rs b/src/main.rs index 3a47d30..84c16d6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,6 @@ use anyhow::Result; use clap::Parser; use rathole::{run, Cli}; -use tokio; #[tokio::main] async fn main() -> Result<()> { diff --git a/src/server.rs b/src/server.rs index 4ab4800..3da2ba2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,13 +1,13 @@ use crate::config::{Config, ServerConfig, ServerServiceConfig, TransportType}; use crate::multi_map::MultiMap; +use crate::protocol::Hello::{ControlChannelHello, DataChannelHello}; use crate::protocol::{ - self, Ack, ControlChannelCmd, DataChannelCmd, Hello, Hello::ControlChannelHello, - Hello::DataChannelHello, HASH_WIDTH_IN_BYTES, + self, read_auth, read_hello, Ack, ControlChannelCmd, DataChannelCmd, Hello, HASH_WIDTH_IN_BYTES, }; -use crate::protocol::{read_auth, read_hello}; use crate::transport::{TcpTransport, TlsTransport, Transport}; use anyhow::{anyhow, bail, Context, Result}; -use backoff::{backoff::Backoff, ExponentialBackoff}; +use backoff::backoff::Backoff; +use backoff::ExponentialBackoff; use rand::RngCore; use std::collections::HashMap; use std::net::SocketAddr; @@ -15,8 +15,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::io::{self, copy_bidirectional, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; -use tokio::sync::mpsc; -use tokio::sync::{oneshot, RwLock}; +use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::time; use tracing::{debug, error, info, info_span, warn, Instrument}; @@ -190,9 +189,7 @@ async fn do_control_channel_handshake( concat.append(&mut nonce); // Read auth - let d = match read_auth(&mut conn).await? { - protocol::Auth(v) => v, - }; + let protocol::Auth(d) = read_auth(&mut conn).await?; // Validate let session_key = protocol::digest(&concat); @@ -259,13 +256,13 @@ struct ControlChannel { } struct ControlChannelHandle { - shutdown_tx: oneshot::Sender, + _shutdown_tx: oneshot::Sender, conn_pool: ConnectionPoolHandle, } impl ControlChannelHandle { fn new(conn: T::Stream, service: ServerServiceConfig) -> ControlChannelHandle { - let (shutdown_tx, shutdown_rx) = oneshot::channel::(); + let (_shutdown_tx, shutdown_rx) = oneshot::channel::(); let name = service.name.clone(); let conn_pool = ConnectionPoolHandle::new(); let actor: ControlChannel = ControlChannel { @@ -282,7 +279,7 @@ impl ControlChannelHandle { }); ControlChannelHandle { - shutdown_tx, + _shutdown_tx, conn_pool, } } @@ -309,7 +306,7 @@ impl ControlChannel { let (data_req_tx, mut data_req_rx) = mpsc::unbounded_channel::(); tokio::spawn(async move { let cmd = bincode::serialize(&ControlChannelCmd::CreateDataChannel).unwrap(); - while let Some(_) = data_req_rx.recv().await { + while data_req_rx.recv().await.is_some() { if self.conn.write_all(&cmd).await.is_err() { break; } @@ -396,18 +393,14 @@ impl ConnectionPoolHandle { impl ConnectionPool { #[tracing::instrument] async fn run(mut self) { - loop { - if let Some(mut visitor) = self.visitor_rx.recv().await { - if let Some(mut ch) = self.data_ch_rx.recv().await { - tokio::spawn(async move { - let cmd = bincode::serialize(&DataChannelCmd::StartForward).unwrap(); - if ch.write_all(&cmd).await.is_ok() { - let _ = copy_bidirectional(&mut ch, &mut visitor).await; - } - }); - } else { - break; - } + while let Some(mut visitor) = self.visitor_rx.recv().await { + if let Some(mut ch) = self.data_ch_rx.recv().await { + tokio::spawn(async move { + let cmd = bincode::serialize(&DataChannelCmd::StartForward).unwrap(); + if ch.write_all(&cmd).await.is_ok() { + let _ = copy_bidirectional(&mut ch, &mut visitor).await; + } + }); } else { break; } diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 000a479..468ea25 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -3,10 +3,8 @@ use anyhow::Result; use async_trait::async_trait; use std::fmt::Debug; use std::net::SocketAddr; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - net::ToSocketAddrs, -}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::ToSocketAddrs; #[async_trait] pub trait Transport: Debug + Send + Sync { @@ -16,7 +14,7 @@ pub trait Transport: Debug + Send + Sync { async fn new(config: &TransportConfig) -> Result>; async fn bind(&self, addr: T) -> Result; async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::Stream, SocketAddr)>; - async fn connect(&self, addr: &String) -> Result; + async fn connect(&self, addr: &str) -> Result; } mod tcp; diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs index df2d541..ebbcf08 100644 --- a/src/transport/tcp.rs +++ b/src/transport/tcp.rs @@ -1,4 +1,5 @@ -use crate::{config::TransportConfig, helper::set_tcp_keepalive}; +use crate::config::TransportConfig; +use crate::helper::set_tcp_keepalive; use super::Transport; use anyhow::Result; @@ -28,7 +29,7 @@ impl Transport for TcpTransport { Ok((s, addr)) } - async fn connect(&self, addr: &String) -> Result { + async fn connect(&self, addr: &str) -> Result { let s = TcpStream::connect(addr).await?; if let Err(e) = set_tcp_keepalive(&s) { error!( diff --git a/src/transport/tls.rs b/src/transport/tls.rs index 73bffb7..d6a246f 100644 --- a/src/transport/tls.rs +++ b/src/transport/tls.rs @@ -1,20 +1,14 @@ use std::net::SocketAddr; use super::Transport; -use crate::{ - config::{TlsConfig, TransportConfig}, - helper::set_tcp_keepalive, -}; +use crate::config::{TlsConfig, TransportConfig}; +use crate::helper::set_tcp_keepalive; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; -use tokio::{ - fs, - net::{TcpListener, TcpStream, ToSocketAddrs}, -}; -use tokio_native_tls::{ - native_tls::{self, Certificate, Identity}, - TlsAcceptor, TlsConnector, TlsStream, -}; +use tokio::fs; +use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; +use tokio_native_tls::native_tls::{self, Certificate, Identity}; +use tokio_native_tls::{TlsAcceptor, TlsConnector, TlsStream}; use tracing::error; #[derive(Debug)] @@ -39,7 +33,7 @@ impl Transport for TlsTransport { let connector = match config.trusted_root.as_ref() { Some(path) => { let s = fs::read_to_string(path).await?; - let cert = Certificate::from_pem(&s.as_bytes())?; + let cert = Certificate::from_pem(s.as_bytes())?; let connector = native_tls::TlsConnector::builder() .add_root_certificate(cert) .build()?; @@ -74,7 +68,7 @@ impl Transport for TlsTransport { Ok((conn, addr)) } - async fn connect(&self, addr: &String) -> Result { + async fn connect(&self, addr: &str) -> Result { let conn = TcpStream::connect(&addr).await?; if let Err(e) = set_tcp_keepalive(&conn) { error!(