fea(launcher): debug logs (#623)

This commit is contained in:
OlivierDehaene 2023-07-17 19:03:07 +02:00 committed by GitHub
parent bc2873246c
commit 44acf72a73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 62 additions and 50 deletions

View File

@ -13,7 +13,7 @@ nix = "0.26.2"
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.93"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["json"] }
tracing-subscriber = { version = "0.3.16", features = ["json", "env-filter"] }
[dev-dependencies]
float_eq = "1.0.1"

View File

@ -4,7 +4,7 @@ use nix::unistd::Pid;
use serde::Deserialize;
use std::env;
use std::ffi::OsString;
use std::io::{BufRead, BufReader, Read};
use std::io::{BufRead, BufReader, Lines, Read};
use std::os::unix::process::{CommandExt, ExitStatusExt};
use std::path::Path;
use std::process::{Child, Command, ExitStatus, Stdio};
@ -15,6 +15,7 @@ use std::thread;
use std::thread::sleep;
use std::time::{Duration, Instant};
use std::{fs, io};
use tracing_subscriber::EnvFilter;
mod env_runtime;
@ -286,7 +287,7 @@ struct Args {
#[derive(Debug)]
enum ShardStatus {
Ready,
Failed((usize, Option<String>)),
Failed(usize),
}
#[allow(clippy::too_many_arguments)]
@ -311,6 +312,9 @@ fn shard_manager(
shutdown: Arc<AtomicBool>,
_shutdown_sender: mpsc::Sender<()>,
) {
// Enter shard-manager tracing span
let _span = tracing::span!(tracing::Level::INFO, "shard-manager", rank = rank).entered();
// Get UDS path
let uds_string = format!("{uds_path}-{rank}");
let uds = Path::new(&uds_string);
@ -438,30 +442,23 @@ fn shard_manager(
if err.kind() == io::ErrorKind::NotFound {
tracing::error!("text-generation-server not found in PATH");
tracing::error!("Please install it with `make install-server`")
} else {
}
{
tracing::error!("{}", err);
}
status_sender
.send(ShardStatus::Failed((rank, Some(err.to_string()))))
.unwrap();
status_sender.send(ShardStatus::Failed(rank)).unwrap();
return;
}
};
// Redirect STDOUT to the console
let shard_stdout_reader = BufReader::new(p.stdout.take().unwrap());
let mut shard_stderr_reader = BufReader::new(p.stderr.take().unwrap());
let shard_stderr_reader = BufReader::new(p.stderr.take().unwrap());
//stdout tracing thread
thread::spawn(move || {
// Enter shard-manager tracing span
let _span = tracing::span!(tracing::Level::INFO, "shard-manager", rank = rank).entered();
for line in shard_stdout_reader.lines() {
// Parse loguru logs
if let Ok(log) = serde_json::from_str::<PythonLogMessage>(&line.unwrap()) {
log.trace();
}
}
log_lines(shard_stdout_reader.lines());
});
let mut ready = false;
@ -470,30 +467,25 @@ fn shard_manager(
loop {
// Process exited
if let Some(exit_status) = p.try_wait().unwrap() {
// We read stderr in another thread as it seems that `read_to_string` can block
// indefinitely in some cases
// We read stderr in another thread as it seems that lines() can block in some cases
let (err_sender, err_receiver) = mpsc::channel();
thread::spawn(move || {
let mut err = String::new();
shard_stderr_reader.read_to_string(&mut err).unwrap();
err_sender.send(err).unwrap_or(());
for line in shard_stderr_reader.lines().flatten() {
err_sender.send(line).unwrap_or(());
}
});
let mut err = String::new();
while let Ok(line) = err_receiver.recv_timeout(Duration::from_millis(10)) {
err = err + "\n" + &line;
}
let err = err_receiver
.recv_timeout(Duration::from_millis(100))
.map_err(|err| {
tracing::error!("Unable to read shard {rank} error from stderr");
err
})
.ok();
tracing::error!("Shard complete standard error output:\n{err}");
if let Some(signal) = exit_status.signal() {
tracing::error!("Shard process was signaled to shutdown with signal {signal}");
}
status_sender
.send(ShardStatus::Failed((rank, err)))
.unwrap();
status_sender.send(ShardStatus::Failed(rank)).unwrap();
return;
}
@ -580,6 +572,23 @@ impl PythonLogMessage {
}
}
impl TryFrom<&String> for PythonLogMessage {
type Error = serde_json::Error;
fn try_from(value: &String) -> Result<Self, Self::Error> {
serde_json::from_str::<Self>(value)
}
}
fn log_lines<S: Sized + BufRead>(lines: Lines<S>) {
for line in lines.flatten() {
match PythonLogMessage::try_from(&line) {
Ok(log) => log.trace(),
Err(_) => tracing::debug!("{line}"),
}
}
}
fn find_num_shards(
sharded: Option<bool>,
num_shard: Option<usize>,
@ -633,6 +642,9 @@ enum LauncherError {
}
fn download_convert_model(args: &Args, running: Arc<AtomicBool>) -> Result<(), LauncherError> {
// Enter download tracing span
let _span = tracing::span!(tracing::Level::INFO, "download").entered();
let mut download_args = vec![
"download-weights".to_string(),
args.model_id.to_string(),
@ -694,6 +706,8 @@ fn download_convert_model(args: &Args, running: Arc<AtomicBool>) -> Result<(), L
if err.kind() == io::ErrorKind::NotFound {
tracing::error!("text-generation-server not found in PATH");
tracing::error!("Please install it with `make install-server`")
} else {
tracing::error!("{}", err);
}
return Err(LauncherError::DownloadError);
@ -702,16 +716,10 @@ fn download_convert_model(args: &Args, running: Arc<AtomicBool>) -> Result<(), L
// Redirect STDOUT to the console
let download_stdout = download_process.stdout.take().unwrap();
thread::spawn(move || {
// Enter download tracing span
let stdout = BufReader::new(download_stdout);
let _span = tracing::span!(tracing::Level::INFO, "download").entered();
for line in stdout.lines() {
// Parse loguru logs
if let Ok(log) = serde_json::from_str::<PythonLogMessage>(&line.unwrap()) {
log.trace();
}
}
thread::spawn(move || {
log_lines(stdout.lines());
});
loop {
@ -816,11 +824,8 @@ fn spawn_shards(
Err(TryRecvError::Empty) => {
sleep(Duration::from_millis(100));
}
Ok(ShardStatus::Failed((rank, err))) => {
Ok(ShardStatus::Failed(rank)) => {
tracing::error!("Shard {rank} failed to start");
if let Some(err) = err {
tracing::error!("{err}");
}
shutdown_shards(shutdown, shutdown_receiver);
return Err(LauncherError::ShardCannotStart);
}
@ -996,10 +1001,20 @@ fn main() -> Result<(), LauncherError> {
// Pattern match configuration
let args = Args::parse();
// Filter events with LOG_LEVEL
let env_filter =
EnvFilter::try_from_env("LOG_LEVEL").unwrap_or_else(|_| EnvFilter::new("info"));
if args.json_output {
tracing_subscriber::fmt().json().init();
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.json()
.init();
} else {
tracing_subscriber::fmt().compact().init();
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.compact()
.init();
}
if args.env {
@ -1102,11 +1117,8 @@ fn main() -> Result<(), LauncherError> {
let mut exit_code = Ok(());
while running.load(Ordering::SeqCst) {
if let Ok(ShardStatus::Failed((rank, err))) = status_receiver.try_recv() {
if let Ok(ShardStatus::Failed(rank)) = status_receiver.try_recv() {
tracing::error!("Shard {rank} crashed");
if let Some(err) = err {
tracing::error!("{err}");
}
exit_code = Err(LauncherError::ShardFailed);
break;
};