From 2d9465d181e0778a5456e5d99503264c98318f65 Mon Sep 17 00:00:00 2001 From: Morgan Funtowicz Date: Fri, 22 Nov 2024 14:02:58 +0100 Subject: [PATCH] misc(backend): allow rebinding numa core affinity --- backends/llamacpp/csrc/backend.cpp | 1 - backends/llamacpp/csrc/ffi.hpp | 10 +++++++++- backends/llamacpp/src/backend.rs | 21 +++++---------------- backends/llamacpp/src/lib.rs | 3 ++- 4 files changed, 16 insertions(+), 19 deletions(-) diff --git a/backends/llamacpp/csrc/backend.cpp b/backends/llamacpp/csrc/backend.cpp index 54f1cf73..b60c3ddc 100644 --- a/backends/llamacpp/csrc/backend.cpp +++ b/backends/llamacpp/csrc/backend.cpp @@ -3,7 +3,6 @@ // #include -#include #include #include diff --git a/backends/llamacpp/csrc/ffi.hpp b/backends/llamacpp/csrc/ffi.hpp index f9eec781..d33a4c7b 100644 --- a/backends/llamacpp/csrc/ffi.hpp +++ b/backends/llamacpp/csrc/ffi.hpp @@ -111,7 +111,7 @@ namespace huggingface::tgi::backends::llamacpp { struct numa_cpumask_deleter { void operator()(struct bitmask* cpumask){ numa_free_cpumask(cpumask); }}; typedef std::unique_ptr unique_cpumask_ptr; - void set_numactl_core_affinity(rust::Slice affinity) { + void set_numa_core_affinity(rust::Slice affinity) { // void set_numactl_core_affinity(std::vector affinity) { #ifdef NUMA_AVAILABLE if(numa_available()) { @@ -173,6 +173,14 @@ namespace huggingface::tgi::backends::llamacpp { SPDLOG_WARN("TGI's llama.cpp backend was compiled without NUMA support"); #endif } + + /** + * + */ + void update_numa_affinity() { + SPDLOG_INFO("Rebinding NUMA affinity for current worker on thread: {}", std::this_thread::get_id()); + llama_numa_init(ggml_numa_strategy::GGML_NUMA_STRATEGY_NUMACTL); + } } diff --git a/backends/llamacpp/src/backend.rs b/backends/llamacpp/src/backend.rs index 5bcb913b..709e5d42 100644 --- a/backends/llamacpp/src/backend.rs +++ b/backends/llamacpp/src/backend.rs @@ -1,6 +1,6 @@ use crate::ffi::{ - create_worker_frontend, set_numactl_core_affinity, GenerationParams, LlamaCppWorkerFrontend, - SamplingParams, + create_worker_frontend, set_numa_core_affinity, update_numa_affinity, GenerationParams, + LlamaCppWorkerFrontend, SamplingParams, }; use async_channel::{unbounded as mpmc_unbounded, Receiver as MpmcReceiver, Sender as MpmcSender}; use async_trait::async_trait; @@ -8,7 +8,6 @@ use cxx::UniquePtr; use log::warn; use std::ops::Range; use std::path::{Path, PathBuf}; -use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; use std::thread::spawn; use text_generation_router::infer::InferError::GenerationError; @@ -25,17 +24,6 @@ use tokio::time::Instant; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, info}; -macro_rules! send_or_warn { - ($send: expr, $err: expr) => { - if let Err(se) = $send.send(err) { - warn!( - "Failed to send message back to the user: {}. Originating error: {}", - se, e - ); - } - }; -} - fn get_num_cores() -> usize { match option_env!("TGI_USE_PHYSICAL_CORES") .unwrap_or("OFF") @@ -272,8 +260,9 @@ fn worker_loop( // This loop will mostly decode single token at every step, so no need to rely on parallelism tokenizers::utils::parallelism::set_parallelism(false); - // Bind cores for the current thread - set_numactl_core_affinity(&affinity); + // Bind cores for the current thread and make sure it's taken into account + set_numa_core_affinity(&affinity); + update_numa_affinity(); loop { if let Ok((generation, stream)) = backlog.recv_blocking() { diff --git a/backends/llamacpp/src/lib.rs b/backends/llamacpp/src/lib.rs index 6b047bf5..e06220f2 100644 --- a/backends/llamacpp/src/lib.rs +++ b/backends/llamacpp/src/lib.rs @@ -54,7 +54,8 @@ mod ffi { num_threads: u32, ) -> Result>; - fn set_numactl_core_affinity(affinity: &[usize]); + fn set_numa_core_affinity(affinity: &[usize]); + fn update_numa_affinity(); unsafe fn stream( self: Pin<&mut LlamaCppWorkerFrontend>,