fix: improve log (#108)

* fix: improve log format

* fix: set data channel errors to warn level

* fix: make span name clearer
This commit is contained in:
Yujia Qiao 2022-01-17 18:05:04 +08:00 committed by GitHub
parent 8caaf0f776
commit 1fe3509536
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 27 additions and 29 deletions

View File

@ -174,13 +174,13 @@ async fn do_data_channel_handshake<T: Transport>(
.connector
.connect(&args.remote_addr)
.await
.with_context(|| "Failed to connect to remote_addr")?;
.with_context(|| format!("Failed to connect to {}", &args.remote_addr))?;
T::hint(&conn, args.socket_opts);
Ok(conn)
},
|e, duration| {
warn!("{:?}. Retry in {:?}", e, duration);
warn!("{:#}. Retry in {:?}", e, duration);
},
)
.await?;
@ -220,7 +220,7 @@ async fn run_data_channel_for_tcp<T: Transport>(
let mut local = TcpStream::connect(local_addr)
.await
.with_context(|| "Failed to connect to local_addr")?;
.with_context(|| format!("Failed to connect to {}", local_addr))?;
let _ = copy_bidirectional(&mut conn, &mut local).await;
Ok(())
}
@ -295,7 +295,7 @@ async fn run_data_channel_for_udp<T: Transport>(conn: T::Stream, local_addr: &st
));
}
Err(e) => {
error!("{:?}", e);
error!("{:#}", e);
}
}
}
@ -383,7 +383,7 @@ impl<T: 'static + Transport> ControlChannel<T> {
.transport
.connect(&self.remote_addr)
.await
.with_context(|| format!("Failed to connect to the server: {}", &self.remote_addr))?;
.with_context(|| format!("Failed to connect to {}", &self.remote_addr))?;
T::hint(&conn, SocketOpts::for_control_channel());
// Send hello
@ -448,7 +448,7 @@ impl<T: 'static + Transport> ControlChannel<T> {
let args = data_ch_args.clone();
tokio::spawn(async move {
if let Err(e) = run_data_channel(args).await.with_context(|| "Failed to run the data channel") {
error!("{:?}", e);
warn!("{:#}", e);
}
}.instrument(Span::current()));
}
@ -466,7 +466,7 @@ impl<T: 'static + Transport> ControlChannel<T> {
}
impl ControlChannelHandle {
#[instrument(skip_all, fields(service = %service.name))]
#[instrument(name="handle", skip_all, fields(service = %service.name))]
fn new<T: 'static + Transport>(
service: ClientServiceConfig,
remote_addr: String,
@ -497,10 +497,10 @@ impl ControlChannelHandle {
}
if let Some(duration) = backoff.next_backoff() {
error!("{:?}\n\nRetry in {:?}...", err, duration);
error!("{:#}. Retry in {:?}...", err, duration);
time::sleep(duration).await;
} else {
error!("{:?}. Break", err);
error!("{:#}. Break", err);
}
}
}

View File

@ -149,7 +149,7 @@ async fn config_watcher(
let _ = fevent_tx.blocking_send(true);
}
}
Err(e) => error!("watch error: {:?}", e),
Err(e) => error!("watch error: {:#}", e),
})?;
watcher.watch(&path, RecursiveMode::NonRecursive)?;
@ -164,7 +164,7 @@ async fn config_watcher(
let new = match Config::from_file(&path).await.with_context(|| "The changed configuration is invalid. Ignored") {
Ok(v) => v,
Err(e) => {
error!("{:?}", e);
error!("{:#}", e);
// If the config is invalid, just ignore it
continue;
}

View File

@ -205,7 +205,7 @@ pub async fn read_control_cmd<T: AsyncRead + AsyncWrite + Unpin>(
let mut bytes = vec![0u8; PACKET_LEN.c_cmd];
conn.read_exact(&mut bytes)
.await
.with_context(|| "Failed to read control cmd")?;
.with_context(|| "Failed to read cmd")?;
bincode::deserialize(&bytes).with_context(|| "Failed to deserialize control cmd")
}
@ -215,6 +215,6 @@ pub async fn read_data_cmd<T: AsyncRead + AsyncWrite + Unpin>(
let mut bytes = vec![0u8; PACKET_LEN.d_cmd];
conn.read_exact(&mut bytes)
.await
.with_context(|| "Failed to read data cmd")?;
.with_context(|| "Failed to read cmd")?;
bincode::deserialize(&bytes).with_context(|| "Failed to deserialize data cmd")
}

View File

@ -149,7 +149,7 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
// EMFILE. So sleep for a while and retry
// TODO: Only sleep for EMFILE, ENFILE, ENOMEM, ENOBUFS
if let Some(d) = backoff.next_backoff() {
error!("Failed to accept: {}. Retry in {:?}...", err, d);
error!("Failed to accept: {:#}. Retry in {:?}...", err, d);
time::sleep(d).await;
} else {
// This branch will never be executed according to the current retry policy
@ -172,11 +172,11 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
let control_channels = self.control_channels.clone();
tokio::spawn(async move {
if let Err(err) = handle_connection(conn, services, control_channels).await {
error!("{:?}", err);
error!("{:#}", err);
}
}.instrument(info_span!("handle_connection", %addr)));
}.instrument(info_span!("connection", %addr)));
}, Err(e) => {
error!("{:?}", e);
error!("{:#}", e);
}
}
},
@ -369,7 +369,7 @@ where
{
// Create a control channel handle, where the control channel handling task
// and the connection pool task are created.
#[instrument(skip_all, fields(service = %service.name))]
#[instrument(name = "handle", skip_all, fields(service = %service.name))]
fn new(conn: T::Stream, service: ServerServiceConfig) -> ControlChannelHandle<T> {
// Create a shutdown channel
let (shutdown_tx, shutdown_rx) = broadcast::channel::<bool>(1);
@ -406,7 +406,7 @@ where
.await
.with_context(|| "Failed to run TCP connection pool")
{
error!("{:?}", e);
error!("{:#}", e);
}
}
.instrument(Span::current()),
@ -422,7 +422,7 @@ where
.await
.with_context(|| "Failed to run TCP connection pool")
{
error!("{:?}", e);
error!("{:#}", e);
}
}
.instrument(Span::current()),
@ -433,7 +433,6 @@ where
let ch = ControlChannel::<T> {
conn,
shutdown_rx,
service: service.clone(),
data_ch_req_rx,
};
@ -441,7 +440,7 @@ where
tokio::spawn(
async move {
if let Err(err) = ch.run().await {
error!("{:?}", err);
error!("{:#}", err);
}
}
.instrument(Span::current()),
@ -458,14 +457,13 @@ where
// Control channel, using T as the transport layer. P is TcpStream or UdpTraffic
struct ControlChannel<T: Transport> {
conn: T::Stream, // The connection of control channel
service: ServerServiceConfig, // A copy of the corresponding service config
shutdown_rx: broadcast::Receiver<bool>, // Receives the shutdown signal
data_ch_req_rx: mpsc::UnboundedReceiver<bool>, // Receives visitor connections
}
impl<T: Transport> ControlChannel<T> {
// Run a control channel
#[instrument(skip(self), fields(service = %self.service.name))]
#[instrument(skip_all)]
async fn run(mut self) -> Result<()> {
let cmd = bincode::serialize(&ControlChannelCmd::CreateDataChannel).unwrap();
@ -476,11 +474,11 @@ impl<T: Transport> ControlChannel<T> {
match val {
Some(_) => {
if let Err(e) = self.conn.write_all(&cmd).await.with_context(||"Failed to write control cmds") {
error!("{:?}", e);
error!("{:#}", e);
break;
}
if let Err(e) = self.conn.flush().await.with_context(|| "Failed to flush control cmds") {
error!("{:?}", e);
error!("{:#}", e);
break;
}
}
@ -522,7 +520,7 @@ fn tcp_listen_and_send(
let l: TcpListener = match l {
Ok(v) => v,
Err(e) => {
error!("{:?}", e);
error!("{:#}", e);
return;
}
};

View File

@ -110,7 +110,7 @@ impl SocketOpts {
if let Err(e) = try_set_tcp_keepalive(conn, keepalive_duration, keepalive_interval)
.with_context(|| "Failed to set keepalive")
{
error!("{:?}", e);
error!("{:#}", e);
}
}
@ -120,7 +120,7 @@ impl SocketOpts {
.set_nodelay(nodelay)
.with_context(|| "Failed to set nodelay")
{
error!("{:?}", e);
error!("{:#}", e);
}
}
}