From 44acf72a736346f4b8e969c9453027ca32786d72 Mon Sep 17 00:00:00 2001 From: OlivierDehaene Date: Mon, 17 Jul 2023 19:03:07 +0200 Subject: [PATCH] fea(launcher): debug logs (#623) --- launcher/Cargo.toml | 2 +- launcher/src/main.rs | 110 ++++++++++++++++++++++++------------------- 2 files changed, 62 insertions(+), 50 deletions(-) diff --git a/launcher/Cargo.toml b/launcher/Cargo.toml index ae0694d..3e7f86d 100644 --- a/launcher/Cargo.toml +++ b/launcher/Cargo.toml @@ -13,7 +13,7 @@ nix = "0.26.2" serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.93" tracing = "0.1.37" -tracing-subscriber = { version = "0.3.16", features = ["json"] } +tracing-subscriber = { version = "0.3.16", features = ["json", "env-filter"] } [dev-dependencies] float_eq = "1.0.1" diff --git a/launcher/src/main.rs b/launcher/src/main.rs index 113877f..e26244e 100644 --- a/launcher/src/main.rs +++ b/launcher/src/main.rs @@ -4,7 +4,7 @@ use nix::unistd::Pid; use serde::Deserialize; use std::env; use std::ffi::OsString; -use std::io::{BufRead, BufReader, Read}; +use std::io::{BufRead, BufReader, Lines, Read}; use std::os::unix::process::{CommandExt, ExitStatusExt}; use std::path::Path; use std::process::{Child, Command, ExitStatus, Stdio}; @@ -15,6 +15,7 @@ use std::thread; use std::thread::sleep; use std::time::{Duration, Instant}; use std::{fs, io}; +use tracing_subscriber::EnvFilter; mod env_runtime; @@ -286,7 +287,7 @@ struct Args { #[derive(Debug)] enum ShardStatus { Ready, - Failed((usize, Option)), + Failed(usize), } #[allow(clippy::too_many_arguments)] @@ -311,6 +312,9 @@ fn shard_manager( shutdown: Arc, _shutdown_sender: mpsc::Sender<()>, ) { + // Enter shard-manager tracing span + let _span = tracing::span!(tracing::Level::INFO, "shard-manager", rank = rank).entered(); + // Get UDS path let uds_string = format!("{uds_path}-{rank}"); let uds = Path::new(&uds_string); @@ -438,30 +442,23 @@ fn shard_manager( if err.kind() == io::ErrorKind::NotFound { tracing::error!("text-generation-server not found in PATH"); tracing::error!("Please install it with `make install-server`") - } else { + } + { tracing::error!("{}", err); } - status_sender - .send(ShardStatus::Failed((rank, Some(err.to_string())))) - .unwrap(); + status_sender.send(ShardStatus::Failed(rank)).unwrap(); return; } }; // Redirect STDOUT to the console let shard_stdout_reader = BufReader::new(p.stdout.take().unwrap()); - let mut shard_stderr_reader = BufReader::new(p.stderr.take().unwrap()); + let shard_stderr_reader = BufReader::new(p.stderr.take().unwrap()); + //stdout tracing thread thread::spawn(move || { - // Enter shard-manager tracing span - let _span = tracing::span!(tracing::Level::INFO, "shard-manager", rank = rank).entered(); - for line in shard_stdout_reader.lines() { - // Parse loguru logs - if let Ok(log) = serde_json::from_str::(&line.unwrap()) { - log.trace(); - } - } + log_lines(shard_stdout_reader.lines()); }); let mut ready = false; @@ -470,30 +467,25 @@ fn shard_manager( loop { // Process exited if let Some(exit_status) = p.try_wait().unwrap() { - // We read stderr in another thread as it seems that `read_to_string` can block - // indefinitely in some cases + // We read stderr in another thread as it seems that lines() can block in some cases let (err_sender, err_receiver) = mpsc::channel(); thread::spawn(move || { - let mut err = String::new(); - shard_stderr_reader.read_to_string(&mut err).unwrap(); - err_sender.send(err).unwrap_or(()); + for line in shard_stderr_reader.lines().flatten() { + err_sender.send(line).unwrap_or(()); + } }); + let mut err = String::new(); + while let Ok(line) = err_receiver.recv_timeout(Duration::from_millis(10)) { + err = err + "\n" + &line; + } - let err = err_receiver - .recv_timeout(Duration::from_millis(100)) - .map_err(|err| { - tracing::error!("Unable to read shard {rank} error from stderr"); - err - }) - .ok(); + tracing::error!("Shard complete standard error output:\n{err}"); if let Some(signal) = exit_status.signal() { tracing::error!("Shard process was signaled to shutdown with signal {signal}"); } - status_sender - .send(ShardStatus::Failed((rank, err))) - .unwrap(); + status_sender.send(ShardStatus::Failed(rank)).unwrap(); return; } @@ -580,6 +572,23 @@ impl PythonLogMessage { } } +impl TryFrom<&String> for PythonLogMessage { + type Error = serde_json::Error; + + fn try_from(value: &String) -> Result { + serde_json::from_str::(value) + } +} + +fn log_lines(lines: Lines) { + for line in lines.flatten() { + match PythonLogMessage::try_from(&line) { + Ok(log) => log.trace(), + Err(_) => tracing::debug!("{line}"), + } + } +} + fn find_num_shards( sharded: Option, num_shard: Option, @@ -633,6 +642,9 @@ enum LauncherError { } fn download_convert_model(args: &Args, running: Arc) -> Result<(), LauncherError> { + // Enter download tracing span + let _span = tracing::span!(tracing::Level::INFO, "download").entered(); + let mut download_args = vec![ "download-weights".to_string(), args.model_id.to_string(), @@ -694,6 +706,8 @@ fn download_convert_model(args: &Args, running: Arc) -> Result<(), L if err.kind() == io::ErrorKind::NotFound { tracing::error!("text-generation-server not found in PATH"); tracing::error!("Please install it with `make install-server`") + } else { + tracing::error!("{}", err); } return Err(LauncherError::DownloadError); @@ -702,16 +716,10 @@ fn download_convert_model(args: &Args, running: Arc) -> Result<(), L // Redirect STDOUT to the console let download_stdout = download_process.stdout.take().unwrap(); + let stdout = BufReader::new(download_stdout); + thread::spawn(move || { - // Enter download tracing span - let stdout = BufReader::new(download_stdout); - let _span = tracing::span!(tracing::Level::INFO, "download").entered(); - for line in stdout.lines() { - // Parse loguru logs - if let Ok(log) = serde_json::from_str::(&line.unwrap()) { - log.trace(); - } - } + log_lines(stdout.lines()); }); loop { @@ -816,11 +824,8 @@ fn spawn_shards( Err(TryRecvError::Empty) => { sleep(Duration::from_millis(100)); } - Ok(ShardStatus::Failed((rank, err))) => { + Ok(ShardStatus::Failed(rank)) => { tracing::error!("Shard {rank} failed to start"); - if let Some(err) = err { - tracing::error!("{err}"); - } shutdown_shards(shutdown, shutdown_receiver); return Err(LauncherError::ShardCannotStart); } @@ -996,10 +1001,20 @@ fn main() -> Result<(), LauncherError> { // Pattern match configuration let args = Args::parse(); + // Filter events with LOG_LEVEL + let env_filter = + EnvFilter::try_from_env("LOG_LEVEL").unwrap_or_else(|_| EnvFilter::new("info")); + if args.json_output { - tracing_subscriber::fmt().json().init(); + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .json() + .init(); } else { - tracing_subscriber::fmt().compact().init(); + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .compact() + .init(); } if args.env { @@ -1102,11 +1117,8 @@ fn main() -> Result<(), LauncherError> { let mut exit_code = Ok(()); while running.load(Ordering::SeqCst) { - if let Ok(ShardStatus::Failed((rank, err))) = status_receiver.try_recv() { + if let Ok(ShardStatus::Failed(rank)) = status_receiver.try_recv() { tracing::error!("Shard {rank} crashed"); - if let Some(err) = err { - tracing::error!("{err}"); - } exit_code = Err(LauncherError::ShardFailed); break; };