From 5335bf973b2fef2c592a3061ccbf9e5c4fec7ab7 Mon Sep 17 00:00:00 2001 From: Morgan Funtowicz Date: Thu, 21 Nov 2024 00:03:05 +0100 Subject: [PATCH] feat(backend): multistream inference on CPU --- Cargo.lock | 1 + backends/llamacpp/CMakeLists.txt | 6 + backends/llamacpp/Cargo.toml | 1 + backends/llamacpp/build.rs | 5 +- backends/llamacpp/cmake/numa.cmake | 20 ++++ backends/llamacpp/csrc/backend.cpp | 2 +- backends/llamacpp/csrc/ffi.hpp | 23 ++++ backends/llamacpp/src/backend.rs | 173 +++++++++++++++++++++++------ backends/llamacpp/src/lib.rs | 2 + 9 files changed, 198 insertions(+), 35 deletions(-) create mode 100644 backends/llamacpp/cmake/numa.cmake diff --git a/Cargo.lock b/Cargo.lock index 6b6cb7a7..81b7c282 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4229,6 +4229,7 @@ dependencies = [ "log", "metrics", "metrics-exporter-prometheus", + "num_cpus", "pkg-config", "serde_json", "text-generation-router", diff --git a/backends/llamacpp/CMakeLists.txt b/backends/llamacpp/CMakeLists.txt index f6dd2db1..13107e0a 100644 --- a/backends/llamacpp/CMakeLists.txt +++ b/backends/llamacpp/CMakeLists.txt @@ -18,6 +18,7 @@ else () endif () # Add dependencies +include(cmake/numa.cmake) include(cmake/spdlog.cmake) if (${LLAMA_CPP_BUILD_CUDA}) @@ -40,6 +41,11 @@ fetchcontent_makeavailable(llama) add_library(tgi_llamacpp_backend_impl STATIC csrc/backend.hpp csrc/backend.cpp) target_compile_features(tgi_llamacpp_backend_impl PRIVATE cxx_std_11) target_link_libraries(tgi_llamacpp_backend_impl PUBLIC spdlog::spdlog llama) + +if (NUMA_FOUND) + target_link_libraries(tgi_llamacpp_backend_impl PUBLIC numa) +endif () + install(TARGETS tgi_llamacpp_backend_impl spdlog llama) if (${CMAKE_BUILD_TYPE} STREQUAL "Debug") diff --git a/backends/llamacpp/Cargo.toml b/backends/llamacpp/Cargo.toml index 48a0bb84..0a5039b3 100644 --- a/backends/llamacpp/Cargo.toml +++ b/backends/llamacpp/Cargo.toml @@ -9,6 +9,7 @@ homepage.workspace = true async-trait = "0.1" clap = { version = "4.5.19", features = ["derive"] } cxx = "1.0" +num_cpus = "1" hf-hub = { workspace = true } image = { version = "0.25.1", features = ["default-formats"] } metrics = { workspace = true } diff --git a/backends/llamacpp/build.rs b/backends/llamacpp/build.rs index 0e9f2ae9..22726db1 100644 --- a/backends/llamacpp/build.rs +++ b/backends/llamacpp/build.rs @@ -86,6 +86,7 @@ fn main() { // Emit linkage search path probe!("ompi", MPI_REQUIRED_VERSION); + probe!("numa", "2.0"); // Backend BACKEND_DEPS.iter().for_each(|name| { @@ -96,7 +97,9 @@ fn main() { println!("cargo:rustc-link-search=native={}", out_dir.display()); let spdlog_linkage_target = if is_debug { "spdlogd" } else { "spdlog" }; - println!("cargo:rustc-link-lib=static={spdlog_linkage_target}"); + let fmt_linkage_target = if is_debug { "fmtd" } else { "fmt" }; + println!("cargo:rustc-link-lib=dylib={spdlog_linkage_target}"); + println!("cargo:rustc-link-lib=dylib={fmt_linkage_target}"); println!("cargo:rustc-link-lib=dylib=ggml"); println!("cargo:rustc-link-lib=dylib=llama"); diff --git a/backends/llamacpp/cmake/numa.cmake b/backends/llamacpp/cmake/numa.cmake new file mode 100644 index 00000000..0399b752 --- /dev/null +++ b/backends/llamacpp/cmake/numa.cmake @@ -0,0 +1,20 @@ +# Find the numa policy library. +# Output variables: +# NUMA_INCLUDE_DIR : e.g., /usr/include/. +# NUMA_LIBRARY : Library path of numa library +# NUMA_FOUND : True if found. +FIND_PATH(NUMA_INCLUDE_DIR NAME numa.h + HINTS $ENV{HOME}/local/include /opt/local/include /usr/local/include /usr/include) + +FIND_LIBRARY(NUMA_LIBRARY NAME numa + HINTS $ENV{HOME}/local/lib64 $ENV{HOME}/local/lib /usr/local/lib64 /usr/local/lib /opt/local/lib64 /opt/local/lib /usr/lib64 /usr/lib +) + +IF (NUMA_INCLUDE_DIR AND NUMA_LIBRARY) + SET(NUMA_FOUND TRUE) + MESSAGE(STATUS "Found numa library: inc=${NUMA_INCLUDE_DIR}, lib=${NUMA_LIBRARY}") +ELSE () + SET(NUMA_FOUND FALSE) + MESSAGE(STATUS "WARNING: Numa library not found.") + MESSAGE(STATUS "Try: 'sudo apt-get install libnuma libnuma-dev' (or sudo yum install numactl numactl-devel)") +ENDIF () \ No newline at end of file diff --git a/backends/llamacpp/csrc/backend.cpp b/backends/llamacpp/csrc/backend.cpp index eb91e517..a30eb217 100644 --- a/backends/llamacpp/csrc/backend.cpp +++ b/backends/llamacpp/csrc/backend.cpp @@ -45,7 +45,7 @@ namespace huggingface::tgi::backends::llamacpp { #ifdef TGI_LLAMACPP_BACKEND_DEBUG char modelName[256]; llama_model_meta_val_str(model.get(), "general.name", modelName, sizeof(modelName)); - SPDLOG_DEBUG(FMT_STRING("Created llama.cpp backend for model: '{}'"), std::string_view(modelName)); + SPDLOG_DEBUG(FMT_STRING("Created llama.cpp backend for model: '{}'"), std::string_view(modelName)); #endif } diff --git a/backends/llamacpp/csrc/ffi.hpp b/backends/llamacpp/csrc/ffi.hpp index 43694fa3..9700f52e 100644 --- a/backends/llamacpp/csrc/ffi.hpp +++ b/backends/llamacpp/csrc/ffi.hpp @@ -5,13 +5,19 @@ #ifndef TGI_LLAMA_CPP_BACKEND_FFI_HPP #define TGI_LLAMA_CPP_BACKEND_FFI_HPP +#include #include #include #include #include #include +#include #include +#include +#include + +#include namespace huggingface::tgi::backends::llamacpp { class llama_cpp_worker_frontend_t; @@ -92,6 +98,23 @@ namespace huggingface::tgi::backends::llamacpp { auto *model = (llama_load_model_from_file(static_cast(modelPath).c_str(), params)); return std::make_unique(model); } + + void set_numactl_core_affinity(rust::Slice affinity) { + SPDLOG_INFO("Setting numactl cores affinity to {} for thread {}", affinity, std::this_thread::get_id()); +// auto nodes = std::unordered_set(); + auto cpumask = numa_allocate_cpumask(); + for(auto core : affinity) { + numa_bitmask_setbit(cpumask, core); + numa_sched_setaffinity(0, cpumask); + } + +//#ifdef TGI_LLAMACPP_BACKEND_DEBUG + auto cpumask_check = numa_allocate_cpumask(); + numa_sched_getaffinity(0, cpumask_check); + SPDLOG_DEBUG(FMT_STRING("numa_sched_affinity for thread {} -> {:b}"), std::this_thread::get_id(), *cpumask_check->maskp); +//#endif + + } } diff --git a/backends/llamacpp/src/backend.rs b/backends/llamacpp/src/backend.rs index dc29b707..fa5bfbab 100644 --- a/backends/llamacpp/src/backend.rs +++ b/backends/llamacpp/src/backend.rs @@ -1,13 +1,17 @@ use crate::ffi::{ - create_worker_frontend, GenerationParams, LlamaCppWorkerFrontend, SamplingParams, + create_worker_frontend, set_numactl_core_affinity, GenerationParams, LlamaCppWorkerFrontend, + SamplingParams, }; use async_trait::async_trait; use cxx::UniquePtr; -use std::ops::Deref; +use log::warn; +use std::cell::RefCell; +use std::ops::Range; use std::path::{Path, PathBuf}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; -use std::thread::{spawn, JoinHandle}; +use std::thread::spawn; +use text_generation_router::infer::InferError::GenerationError; use text_generation_router::infer::{Backend, GeneratedText, InferError, InferStreamResponse}; use text_generation_router::validation::{ ValidGenerateRequest, ValidParameters, ValidStoppingParameters, @@ -15,11 +19,41 @@ use text_generation_router::validation::{ use text_generation_router::{FinishReason, Token}; use thiserror::Error; use tokenizers::Tokenizer; -use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::{Semaphore, SemaphorePermit, TryAcquireError}; +use tokio::task::JoinHandle; use tokio::time::Instant; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, info}; +macro_rules! send_or_warn { + ($send: expr, $err: expr) => { + if let Err(se) = $send.send(err) { + warn!( + "Failed to send message back to the user: {}. Originating error: {}", + se, e + ); + } + }; +} + +fn get_num_cores() -> usize { + match option_env!("TGI_USE_PHYSICAL_CORES") + .unwrap_or("OFF") + .to_uppercase() + .as_str() + { + "ON" => { + info!("Using only physical cores on the machine"); + num_cpus::get_physical() + } + _ => { + info!("Using physical and logical cores on the machine"); + num_cpus::get() + } + } +} + type InferResult = Result; unsafe impl Send for LlamaCppWorkerFrontend {} @@ -71,12 +105,19 @@ pub enum LlamaCppBackendError { struct LlamaCppWorker { sender: Sender<(GenerationContext, UnboundedSender)>, - handle: JoinHandle<()>, } -pub enum LlamaCppBackend { - Single(LlamaCppWorker), - // Multi(Vec) +impl LlamaCppWorker { + fn submit(&self, ctx: GenerationContext, sx: UnboundedSender) { + if let Err(err) = self.sender.send((ctx, sx)) { + // TODO: What do we do? + } + } +} + +pub struct LlamaCppBackend { + scheduler_sender: UnboundedSender<(GenerationContext, UnboundedSender)>, + scheduler_handle: JoinHandle<()>, } impl LlamaCppBackend { @@ -93,28 +134,67 @@ impl LlamaCppBackend { tokenizer: Arc, num_cores_per_instance: u16, ) -> Result { - let shared_path = Arc::new(model_path); - let path = shared_path.deref().as_ref(); + let path = model_path.as_ref(); if !path.exists() { return Err(LlamaCppBackendError::ModelFileDoesntExist( path.display().to_string(), )); } - let worker = match num_cores_per_instance { - 0 => { - let worker = Self::allocate_worker(path)?; - let (sender, receiver) = channel(); - let handle = spawn(move || scheduler_loop(worker, tokenizer, receiver)); - LlamaCppBackend::Single(LlamaCppWorker { sender, handle }) - } - _ => panic!("No supported yet"), - }; + let cores_allocation = get_cores_allocation(num_cores_per_instance as usize); - Ok(worker) + // Allocate all the workers + let streams = cores_allocation + .iter() + .map(|affinity| match Self::allocate_worker(path) { + Ok(worker) => { + let tokenizer = Arc::clone(&tokenizer); + let (sender, receiver) = channel(); + let affinity = affinity.clone().collect::>(); + spawn(move || worker_loop(worker, affinity, tokenizer, receiver)); + + Ok(LlamaCppWorker { sender }) + } + Err(e) => Err(e), + }) + .collect::, _>>()?; + + // Start the scheduler loop + let (scheduler_sender, scheduler_receiver) = unbounded_channel(); + let scheduler_handle = tokio::spawn(scheduler_loop(scheduler_receiver, streams)); + Ok(Self { + scheduler_sender, + scheduler_handle, + }) } } +fn get_cores_allocation(num_cores_per_instance: usize) -> Vec> { + // Get the total number of cores on the CPU + let cores_count = get_num_cores(); + + // Make sure each instance has some cores available + let mut effective_num_cores_per_instance = match num_cores_per_instance { + 0 => cores_count, + _ => num_cores_per_instance, + }; + + // If we have spare cores, let's see if we can give everyone one more core + let mut num_instances = cores_count / effective_num_cores_per_instance; + if cores_count - (num_instances * effective_num_cores_per_instance) >= num_instances { + effective_num_cores_per_instance = effective_num_cores_per_instance + 1; + warn!("Overriding cores allocation to {effective_num_cores_per_instance} per instance"); + } + + (0..num_instances) + .map(|ordinal| { + let start = ordinal * effective_num_cores_per_instance; + let end = (ordinal + 1) * effective_num_cores_per_instance - 1; + (start..end) + }) + .collect() +} + fn llama_generate_callback( ctx: *mut InferContext, new_token_id: u32, @@ -164,12 +244,12 @@ fn llama_generate_callback( start: ctx.start, queued: ctx.start, }), - Err(err) => Err(InferError::GenerationError(err.to_string())), + Err(err) => Err(GenerationError(err.to_string())), } } } } - Err(ref err) => Err(InferError::GenerationError(err.to_string())), + Err(ref err) => Err(GenerationError(err.to_string())), }; // Send back to the client @@ -179,14 +259,43 @@ fn llama_generate_callback( status.is_err() } -fn scheduler_loop( +async fn scheduler_loop( + mut queue: UnboundedReceiver<(GenerationContext, UnboundedSender)>, + mut workers: Vec, +) { + // Semaphore allows us to wait for a worker to become available + let permits = Semaphore::new(workers.len()); + + // Let's receive incoming requests + loop { + match queue.recv().await { + None => break, + Some((ctx, sender)) => { + let permit = permits.try_acquire(); + if let Err(err) = permit { + let _ = sender.send(Err(InferError::Overloaded(err))); + } + + // We can unwrap because we wouldn't have a semaphore available otherwise + let worker = workers.pop().unwrap(); + worker.submit(ctx, sender); + } + } + } +} + +fn worker_loop( mut backend: UniquePtr, + affinity: Vec, tokenizer: Arc, backlog: Receiver<(GenerationContext, UnboundedSender)>, ) { // This loop will mostly decode single token at every step, so no need to rely on parallelism tokenizers::utils::parallelism::set_parallelism(false); + // Bind cores for the current thread + set_numactl_core_affinity(&affinity); + loop { if let Ok((generation, stream)) = backlog.recv() { let start = Instant::now(); @@ -214,6 +323,7 @@ fn scheduler_loop( llama_generate_callback, ) { error!("Error while decoding tokens... {}", e.what()); + // TODO: What error to give back to the user? } // Make sure we re-keep track of the OpaqueStream box @@ -244,18 +354,15 @@ impl Backend for LlamaCppBackend { sampling_params, }; - match self { - LlamaCppBackend::Single(worker) => match worker.sender.send((ctx, sx)) { - Ok(_) => Ok(UnboundedReceiverStream::new(rx)), - Err(_) => Err(InferError::GenerationError( - "Failed to sent the request".to_string(), - )), - }, + // We send the workload to the scheduler + if let Err(e) = self.scheduler_sender.send((ctx, sx)) { + Err(InferError::IncompleteGenerationStream) + } else { + // We are returning the associated channel as early as we can, potentially closing it up + Ok(UnboundedReceiverStream::new(rx)) } } else { - Err(InferError::GenerationError( - "Unsupported modalities".to_string(), - )) + Err(GenerationError("Unsupported modalities".to_string())) } } diff --git a/backends/llamacpp/src/lib.rs b/backends/llamacpp/src/lib.rs index 8fc98955..f9fc72e5 100644 --- a/backends/llamacpp/src/lib.rs +++ b/backends/llamacpp/src/lib.rs @@ -51,6 +51,8 @@ mod ffi { fn create_worker_frontend(modelPath: &str) -> Result>; + fn set_numactl_core_affinity(affinity: &[usize]); + unsafe fn stream( self: Pin<&mut LlamaCppWorkerFrontend>, tokens: &[u32],