Add support for stop words in TRTLLM (#2678)
* feat(trtllm): rewrite health to not account for current state * chore(looper): cleanup a bit more * feat(post_processing): max_new_tokens is const evaluated now * chore(ffi):formatting * feat(trtllm): add stop words handling # Conflicts: # backends/trtllm/lib/backend.cpp * chore(trtllm): create specific parallelconfig factory and logging init methods * chore(trtllm): define a macro for SizeType cast * chore(trtllm): use GetParallelConfig * chore(trtllm): minor refactoring * chore(trtllm): validate there are enough GPus on the system for the desired model * chore(trtllm): ensure max throughput scheduling policy is selected * chore(trtllm): minor fix * chore(router): minor refactorings * feat(docker): build with-slurm ompi * feat(docker): add python3.10 dev to runtime deps * chore(docker): add mpi to ld_library_path * chore(docker): install transformers * feat(trtllm): detect stop_words from generation_config.json
This commit is contained in:
parent
db68bd0524
commit
ba5fc7d922
|
@ -43,7 +43,7 @@ RUN wget "https://download.open-mpi.org/release/open-mpi/v4.1/$OMPI_TARBALL_FILE
|
|||
mkdir /usr/src/mpi && \
|
||||
tar -xf "/opt/src/$OMPI_TARBALL_FILENAME" -C /usr/src/mpi --strip-components=1 && \
|
||||
cd /usr/src/mpi && \
|
||||
./configure --prefix=/usr/local/mpi --with-cuda=/usr/local/cuda && \
|
||||
./configure --prefix=/usr/local/mpi --with-cuda=/usr/local/cuda --with-slurm && \
|
||||
make -j all && \
|
||||
make install && \
|
||||
rm -rf "/opt/src/$OMPI_TARBALL_FILENAME"
|
||||
|
@ -84,12 +84,13 @@ RUN mkdir $TGI_INSTALL_PREFIX && mkdir "$TGI_INSTALL_PREFIX/include" && mkdir "$
|
|||
CMAKE_INSTALL_PREFIX=$TGI_INSTALL_PREFIX cargo build --release
|
||||
|
||||
FROM nvidia/cuda:12.6.1-cudnn-runtime-ubuntu22.04 AS runtime
|
||||
RUN apt update && apt install -y python3 && \
|
||||
rm -rf /var/lib/{apt,dpkg,cache,log}/
|
||||
RUN apt update && apt install -y python3-minimal python3-dev python3-pip && \
|
||||
rm -rf /var/lib/{apt,dpkg,cache,log}/ && \
|
||||
python3 -m pip install transformers tokenizers
|
||||
|
||||
WORKDIR /usr/local/tgi/bin
|
||||
|
||||
ENV LD_LIBRARY_PATH="/usr/local/tgi/lib:/usr/local/tensorrt/lib:/usr/local/cuda/lib64/stubs:$LD_LIBRARY_PATH"
|
||||
ENV LD_LIBRARY_PATH="/usr/local/tgi/lib:/usr/local/mpi/lib:/usr/local/tensorrt/lib:/usr/local/cuda/lib64/stubs:$LD_LIBRARY_PATH"
|
||||
ENV TOKENIZERS_PARALLELISM=false
|
||||
ENV OMPI_MCA_plm_rsh_agent=""
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
#ifndef TGI_TRTLLM_BACKEND_H
|
||||
#define TGI_TRTLLM_BACKEND_H
|
||||
|
||||
#include <array>
|
||||
#include <cmath>
|
||||
#include <filesystem>
|
||||
#include <span>
|
||||
|
@ -19,11 +20,16 @@
|
|||
using json = nlohmann::json;
|
||||
namespace tle = tensorrt_llm::executor;
|
||||
|
||||
|
||||
#define CAST_SIZETYPE(x) static_cast<tle::SizeType32>(x)
|
||||
|
||||
namespace huggingface::tgi::backends {
|
||||
using RequestId = tle::IdType;
|
||||
using TokenId = tle::TokenIdType;
|
||||
|
||||
const static auto OUTPUT_CONFIG = tle::OutputConfig(true, false, false, true, false);
|
||||
constexpr auto FMT_NOT_ENOUGH_GPUS = FMT_STRING(
|
||||
"Not enough GPUs to allocate requested model (detected: {:d}, required: {:d})");
|
||||
constexpr auto FMT_EXECUTOR_STATS = FMT_STRING(
|
||||
"Submitting inference [{}] to the executor ({:d} already in-flight)");
|
||||
constexpr auto FMT_SAMPLING_CONFIG = FMT_STRING(
|
||||
|
@ -35,6 +41,12 @@ namespace huggingface::tgi::backends {
|
|||
*/
|
||||
void InitializeBackend();
|
||||
|
||||
/**
|
||||
* Initialize logging mechanism
|
||||
*/
|
||||
void InitializeLogging();
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @param config TensorRT-LLM configuration object
|
||||
|
@ -43,6 +55,14 @@ namespace huggingface::tgi::backends {
|
|||
*/
|
||||
tle::ExecutorConfig GetExecutorConfig(const json &config, const std::string &workerPath);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param worldSize
|
||||
* @param workerPath
|
||||
* @return
|
||||
*/
|
||||
tle::ParallelConfig GetParallelConfig(size_t worldSize, std::string workerPath) noexcept;
|
||||
|
||||
/**
|
||||
* Get the sampling configuration from the parameters provided by TGI
|
||||
* @param topK
|
||||
|
@ -62,6 +82,14 @@ namespace huggingface::tgi::backends {
|
|||
uint64_t seed
|
||||
) noexcept;
|
||||
|
||||
/**
|
||||
* Attempt to retrieve the
|
||||
* @param generationConfigPath
|
||||
* @return
|
||||
*/
|
||||
std::optional<std::list<std::vector<TokenId>>>
|
||||
GetStopWordsFromConfig(const std::filesystem::path &generationConfigPath) noexcept;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
@ -72,6 +100,7 @@ namespace huggingface::tgi::backends {
|
|||
|
||||
/** Frequently accessed variables cached here **/
|
||||
uint32_t maxNumTokens;
|
||||
std::list<std::vector<TokenId>> stopWords;
|
||||
|
||||
public:
|
||||
explicit TensorRtLlmBackend(
|
||||
|
@ -91,20 +120,20 @@ namespace huggingface::tgi::backends {
|
|||
* @param topK
|
||||
* @param topP
|
||||
* @param temperature
|
||||
* @param repetition_penalty
|
||||
* @param frequency_penalty
|
||||
* @param repetitionPenalty
|
||||
* @param frequencyPenalty
|
||||
* @param seed
|
||||
* @return Request id related to this generation for reference
|
||||
*/
|
||||
[[nodiscard]] RequestId Submit(
|
||||
const std::vector<TokenId> &tokens,
|
||||
const uint32_t maxNewTokens,
|
||||
const int32_t topK,
|
||||
const float_t topP,
|
||||
const float_t temperature,
|
||||
const float_t repetition_penalty,
|
||||
const float_t frequency_penalty,
|
||||
const uint64_t seed
|
||||
uint32_t maxNewTokens,
|
||||
int32_t topK,
|
||||
float_t topP,
|
||||
float_t temperature,
|
||||
float_t repetitionPenalty,
|
||||
float_t frequencyPenalty,
|
||||
uint64_t seed
|
||||
);
|
||||
|
||||
[[nodiscard]] std::vector<tle::Response> PullNewTokens();
|
||||
|
|
|
@ -23,9 +23,9 @@ namespace huggingface::hardware::cuda {
|
|||
int32_t major;
|
||||
int32_t minor;
|
||||
|
||||
[[nodiscard]] constexpr bool isPostAmpere() const { return major >= AMPERE_SM_MAJOR; }
|
||||
[[nodiscard]] constexpr bool IsPostAmpere() const { return major >= AMPERE_SM_MAJOR; }
|
||||
|
||||
[[nodiscard]] constexpr bool isPostHopper() const { return major >= HOPPER_SM_MAJOR; }
|
||||
[[nodiscard]] constexpr bool IsPostHopper() const { return major >= HOPPER_SM_MAJOR; }
|
||||
};
|
||||
|
||||
CudaComputeCapabilities GetCudaComputeCapabilities() {
|
||||
|
|
|
@ -8,7 +8,9 @@
|
|||
#include "backend.h"
|
||||
#include "hardware.h"
|
||||
|
||||
void huggingface::tgi::backends::InitializeBackend() {
|
||||
|
||||
void huggingface::tgi::backends::InitializeLogging() {
|
||||
#ifdef NDEBUG
|
||||
if (const auto TRTLLM_LOG_LEVEL_CSTR = std::getenv("TRTLLM_LOG_LEVEL")) {
|
||||
std::string log_level(TRTLLM_LOG_LEVEL_CSTR);
|
||||
std::transform(log_level.begin(), log_level.end(), log_level.begin(), [](unsigned char c) {
|
||||
|
@ -20,11 +22,18 @@ void huggingface::tgi::backends::InitializeBackend() {
|
|||
else
|
||||
spdlog::set_level(spdlog::level::info);
|
||||
}
|
||||
#else
|
||||
spdlog::set_level(spdlog::level::debug);
|
||||
#endif
|
||||
}
|
||||
|
||||
void huggingface::tgi::backends::InitializeBackend() {
|
||||
SPDLOG_INFO("Initializing Backend...");
|
||||
nvmlInit_v2();
|
||||
initTrtLlmPlugins();
|
||||
|
||||
InitializeLogging();
|
||||
|
||||
SPDLOG_INFO("Backend Executor Version: {}", tle::version());
|
||||
const auto numGpus = huggingface::hardware::cuda::GetNumDevices();
|
||||
if (numGpus.has_value()) {
|
||||
|
@ -34,6 +43,23 @@ void huggingface::tgi::backends::InitializeBackend() {
|
|||
}
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
tle::ParallelConfig
|
||||
huggingface::tgi::backends::GetParallelConfig(const size_t worldSize, const std::string workerPath) noexcept {
|
||||
auto mode = tle::CommunicationMode::kLEADER;
|
||||
std::optional<tle::OrchestratorConfig> orchestratorConfig = std::nullopt;
|
||||
|
||||
if (worldSize > 1) {
|
||||
SPDLOG_INFO("Detected sharded engine deployment, using orchestrator mode");
|
||||
mode = tle::CommunicationMode::kORCHESTRATOR;
|
||||
orchestratorConfig = std::make_optional<tle::OrchestratorConfig>(true, workerPath, nullptr, true);
|
||||
} else {
|
||||
SPDLOG_INFO("Detected single engine deployment, using leader mode");
|
||||
}
|
||||
|
||||
return tle::ParallelConfig(tle::CommunicationType::kMPI, mode, std::nullopt, std::nullopt, orchestratorConfig);
|
||||
}
|
||||
|
||||
[[nodiscard]]
|
||||
tle::ExecutorConfig huggingface::tgi::backends::GetExecutorConfig(const json &config, const std::string &workerPath) {
|
||||
tle::ExecutorConfig execConfig(/* maxBeamWidth = */ 1);
|
||||
|
@ -42,29 +68,13 @@ tle::ExecutorConfig huggingface::tgi::backends::GetExecutorConfig(const json &co
|
|||
const auto computeCapabilities = huggingface::hardware::cuda::GetCudaComputeCapabilities();
|
||||
|
||||
// Single engine (TP = PP = 1) -> using leader mode (no MPI involved)
|
||||
if (config["/pretrained_config/mapping/world_size"_json_pointer].get<uint8_t>() == 1) {
|
||||
SPDLOG_INFO("Detected single engine deployment, using leader mode");
|
||||
execConfig.setParallelConfig(tle::ParallelConfig(
|
||||
tle::CommunicationType::kMPI,
|
||||
tle::CommunicationMode::kLEADER,
|
||||
std::nullopt,
|
||||
std::nullopt,
|
||||
std::nullopt
|
||||
));
|
||||
} else { // Multiple engines -> using orchestrator mode (MPI involved)
|
||||
SPDLOG_INFO("Detected sharded engine deployment, using orchestrator mode");
|
||||
execConfig.setParallelConfig(tle::ParallelConfig(
|
||||
tle::CommunicationType::kMPI,
|
||||
tle::CommunicationMode::kORCHESTRATOR,
|
||||
std::nullopt,
|
||||
std::nullopt,
|
||||
tle::OrchestratorConfig(true, workerPath, nullptr, true)
|
||||
));
|
||||
}
|
||||
const auto worldSize = config["/pretrained_config/mapping/world_size"_json_pointer].get<size_t>();
|
||||
execConfig.setParallelConfig(GetParallelConfig(worldSize, workerPath));
|
||||
|
||||
// Define some configuration variables
|
||||
execConfig.setKvCacheConfig(tle::KvCacheConfig(true));
|
||||
execConfig.setEnableChunkedContext(computeCapabilities.isPostAmpere());
|
||||
execConfig.setEnableChunkedContext(computeCapabilities.IsPostAmpere());
|
||||
execConfig.setSchedulerConfig(tle::SchedulerConfig(tle::CapacitySchedulerPolicy::kMAX_UTILIZATION));
|
||||
return execConfig;
|
||||
}
|
||||
|
||||
|
@ -93,6 +103,31 @@ tle::SamplingConfig huggingface::tgi::backends::GetSamplingConfig(
|
|||
);
|
||||
}
|
||||
|
||||
std::optional<std::list<std::vector<huggingface::tgi::backends::TokenId>>>
|
||||
huggingface::tgi::backends::GetStopWordsFromConfig(
|
||||
const std::filesystem::path &generationConfigPath) noexcept {
|
||||
if (exists(generationConfigPath)) {
|
||||
const auto generationConfig = json::parse(std::ifstream(generationConfigPath));
|
||||
if (const auto eosTokenIds = generationConfig["/eos_token_id"_json_pointer]; eosTokenIds.is_array()) {
|
||||
SPDLOG_INFO(FMT_STRING("Found {:d} EOS tokens"), eosTokenIds.size());
|
||||
std::list<std::vector<huggingface::tgi::backends::TokenId>> stopWords(eosTokenIds.size());
|
||||
|
||||
const auto to_single_token = [](const auto tokenIdObj) -> decltype(stopWords)::value_type {
|
||||
return {tokenIdObj.template get<tle::TokenIdType>()};
|
||||
};
|
||||
|
||||
std::transform(eosTokenIds.cbegin(), eosTokenIds.cend(), stopWords.begin(), to_single_token);
|
||||
return stopWords;
|
||||
} else {
|
||||
SPDLOG_INFO("Invalid EOS tokens entry found (not an array)");
|
||||
}
|
||||
} else {
|
||||
SPDLOG_INFO("No EOS tokens found, generation_config.json doesn't exist");
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
huggingface::tgi::backends::TensorRtLlmBackend::TensorRtLlmBackend(
|
||||
const std::filesystem::path &enginesFolder,
|
||||
const std::filesystem::path &executorWorker
|
||||
|
@ -100,21 +135,34 @@ huggingface::tgi::backends::TensorRtLlmBackend::TensorRtLlmBackend(
|
|||
config(json::parse(std::ifstream(enginesFolder / "config.json"))),
|
||||
executor(enginesFolder, tensorrt_llm::executor::ModelType::kDECODER_ONLY,
|
||||
GetExecutorConfig(config, executorWorker.string())) {
|
||||
SPDLOG_INFO(FMT_STRING("Engine (version={})"), config["/version"_json_pointer].get_ref<const std::string &>());
|
||||
|
||||
SPDLOG_INFO(FMT_STRING("Engine (version={})"), config["/version"_json_pointer].get<std::string_view>());
|
||||
|
||||
// Ensure we have enough GPUs on the system
|
||||
const auto worldSize = config["/pretrained_config/mapping/world_size"_json_pointer].get<size_t>();
|
||||
const auto numGpus = huggingface::hardware::cuda::GetNumDevices().value_or(0);
|
||||
if (numGpus < worldSize) {
|
||||
SPDLOG_CRITICAL(FMT_NOT_ENOUGH_GPUS, numGpus, worldSize);
|
||||
// todo : raise exception to catch on rust side
|
||||
}
|
||||
|
||||
// Cache variables
|
||||
maxNumTokens = config["/build_config/max_num_tokens"_json_pointer].get<uint32_t>();
|
||||
|
||||
// Attempt to discover stopWords from the generation_config.json
|
||||
const auto generationConfigPath = enginesFolder / "generation_config.json";
|
||||
stopWords = GetStopWordsFromConfig(generationConfigPath).value_or(std::list<std::vector<TokenId>>());
|
||||
}
|
||||
|
||||
[[nodiscard("Returned number of requests needs to be consumed")]]
|
||||
size_t huggingface::tgi::backends::TensorRtLlmBackend::NumResponsesReady() const {
|
||||
#ifdef NDEBUG
|
||||
return executor.getNumResponsesReady();
|
||||
#else
|
||||
const auto numResponses = executor.getNumResponsesReady();
|
||||
|
||||
#ifndef NDEBUG
|
||||
if(numResponses > 0) SPDLOG_INFO(FMT_STRING("Num responses ready: {:d}"), numResponses);
|
||||
#endif
|
||||
|
||||
if (numResponses > 0) SPDLOG_INFO(FMT_STRING("Num responses ready: {:d}"), numResponses);
|
||||
return numResponses;
|
||||
#endif
|
||||
}
|
||||
|
||||
[[nodiscard("Returned request id needs to be provided back to gather generated tokens")]]
|
||||
|
@ -124,8 +172,8 @@ tle::IdType huggingface::tgi::backends::TensorRtLlmBackend::Submit(
|
|||
const int32_t topK,
|
||||
const float_t topP,
|
||||
const float_t temperature,
|
||||
const float_t repetition_penalty,
|
||||
const float_t frequency_penalty,
|
||||
const float_t repetitionPenalty,
|
||||
const float_t frequencyPenalty,
|
||||
const uint64_t seed
|
||||
) {
|
||||
const auto maxNewTokensChecked = std::min(maxNewTokens, static_cast<uint32_t>(maxNumTokens - tokens.size()));
|
||||
|
@ -135,14 +183,19 @@ tle::IdType huggingface::tgi::backends::TensorRtLlmBackend::Submit(
|
|||
const auto &lastIteration = iterations.front();
|
||||
|
||||
SPDLOG_DEBUG(FMT_EXECUTOR_STATS, fmt::join(tokens, ", "), lastIteration.numActiveRequests);
|
||||
SPDLOG_DEBUG(FMT_SAMPLING_CONFIG, topK, topP, temperature, repetition_penalty, frequency_penalty, seed);
|
||||
SPDLOG_DEBUG(FMT_SAMPLING_CONFIG, topK, topP, temperature, repetitionPenalty, frequencyPenalty, seed);
|
||||
SPDLOG_DEBUG(FMT_STRING("Asking for max_new_tokens={:d}"), maxNewTokensChecked);
|
||||
}
|
||||
#endif
|
||||
|
||||
const auto sampling = GetSamplingConfig(topK, topP, temperature, repetition_penalty, frequency_penalty, seed);
|
||||
const auto maxNewTokensChecked_ = static_cast<tle::SizeType32>(maxNewTokensChecked);
|
||||
return executor.enqueueRequest(tle::Request{tokens, maxNewTokensChecked_, true, sampling, OUTPUT_CONFIG});
|
||||
const auto sampling = GetSamplingConfig(topK, topP, temperature, repetitionPenalty, frequencyPenalty, seed);
|
||||
|
||||
// Build the request
|
||||
auto request = tle::Request{tokens, CAST_SIZETYPE(maxNewTokensChecked), true, sampling, OUTPUT_CONFIG};
|
||||
request.setStopWords(stopWords);
|
||||
|
||||
// Submit to the executor for batching
|
||||
return executor.enqueueRequest(request);
|
||||
}
|
||||
|
||||
std::vector<tle::Response> huggingface::tgi::backends::TensorRtLlmBackend::PullNewTokens() {
|
||||
|
|
|
@ -23,9 +23,14 @@ huggingface::tgi::backends::TensorRtLlmBackendImpl::TensorRtLlmBackendImpl(
|
|||
|
||||
|
||||
uint64_t huggingface::tgi::backends::TensorRtLlmBackendImpl::Submit(
|
||||
rust::Slice<const uint32_t> tokens, uint32_t maxNewTokens,
|
||||
int32_t topK, float_t topP, float_t temperature,
|
||||
float_t repetition_penalty, float_t frequency_penalty, uint64_t seed) {
|
||||
rust::Slice<const uint32_t> tokens,
|
||||
uint32_t maxNewTokens,
|
||||
int32_t topK,
|
||||
float_t topP,
|
||||
float_t temperature,
|
||||
float_t repetition_penalty,
|
||||
float_t frequency_penalty,
|
||||
uint64_t seed) {
|
||||
|
||||
// This will copy all the items from the initial slice
|
||||
std::vector<int32_t> tokens_(tokens.begin(), tokens.end());
|
||||
|
|
|
@ -27,11 +27,6 @@ use crate::utils::first_line;
|
|||
|
||||
type InferResult<T> = Result<T, InferError>;
|
||||
|
||||
struct IdentifiableRequest<T> {
|
||||
request_id: u64,
|
||||
inner: T,
|
||||
}
|
||||
|
||||
/// Wrap the requests along with the channel used to stream back to the client the decoded tokens
|
||||
struct GenerationContext {
|
||||
request: ValidGenerateRequest,
|
||||
|
@ -164,9 +159,8 @@ fn executor_status_looper(
|
|||
}
|
||||
}
|
||||
|
||||
fn post_processor_looper(
|
||||
fn post_processor_looper<const MAX_NUM_TOKENS: usize>(
|
||||
tokenizer: Tokenizer,
|
||||
max_num_tokens: usize,
|
||||
max_inflight_requests: usize,
|
||||
mut decoded_tokens: UnboundedReceiver<(u64, InferResult<DecodedTokenContext>)>,
|
||||
) {
|
||||
|
@ -185,7 +179,7 @@ fn post_processor_looper(
|
|||
.entry(request_id)
|
||||
.and_modify(|s| s.push(*&ctx.token.id))
|
||||
.or_insert_with(|| {
|
||||
let mut state = Vec::with_capacity(max_num_tokens);
|
||||
let mut state = Vec::with_capacity(MAX_NUM_TOKENS);
|
||||
state.push(*&ctx.token.id);
|
||||
state
|
||||
});
|
||||
|
@ -319,12 +313,7 @@ impl TensorRtLlmBackendV2 {
|
|||
|
||||
// Post processor looper is responsible from receiving a bunch of tokens, decoding them and sending them back to the user
|
||||
let post_processor_looper = spawn_blocking(move || {
|
||||
post_processor_looper(
|
||||
tokenizer,
|
||||
512,
|
||||
max_inflight_requests,
|
||||
post_processor_receiver,
|
||||
)
|
||||
post_processor_looper::<256>(tokenizer, max_inflight_requests, post_processor_receiver)
|
||||
});
|
||||
|
||||
Ok(TensorRtLlmBackendV2 {
|
||||
|
@ -387,9 +376,7 @@ impl Backend for TensorRtLlmBackendV2 {
|
|||
}
|
||||
}
|
||||
|
||||
async fn health(&self, current_health: bool) -> bool {
|
||||
current_health
|
||||
& !self.executor_looper.is_finished()
|
||||
& !self.post_processor_looper.is_finished()
|
||||
async fn health(&self, _: bool) -> bool {
|
||||
!self.executor_looper.is_finished() & !self.post_processor_looper.is_finished()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue