This commit is contained in:
drbh 2024-04-15 19:23:45 +02:00 committed by GitHub
commit bbffc85bfd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 515 additions and 90 deletions

View File

@ -398,6 +398,15 @@ Options:
-e, --env
Display a lot of information about your runtime environment
```
## MAX_CLIENT_BATCH_SIZE
```shell
--max-client-batch-size <MAX_CLIENT_BATCH_SIZE>
Control the maximum number of inputs that a client can send in a single request
[env: MAX_CLIENT_BATCH_SIZE=]
[default: 4]
```
## HELP
```shell

View File

@ -0,0 +1,38 @@
{
"choices": [
{
"finish_reason": "eos_token",
"index": 1,
"logprobs": null,
"text": " PR for more information?"
},
{
"finish_reason": "length",
"index": 3,
"logprobs": null,
"text": "hd20220811-"
},
{
"finish_reason": "length",
"index": 0,
"logprobs": null,
"text": "le Business Incubator is providing a workspace"
},
{
"finish_reason": "length",
"index": 2,
"logprobs": null,
"text": " severely flawed and often has a substandard"
}
],
"created": 1712875413,
"id": "",
"model": "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
"object": "text_completion",
"system_fingerprint": "1.4.5-native",
"usage": {
"completion_tokens": 36,
"prompt_tokens": 8,
"total_tokens": 44
}
}

View File

@ -0,0 +1 @@
"<ClientResponse(http://localhost:8041/v1/completions) [200 OK]>\n<CIMultiDictProxy('Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'x-compute-type': '1-nvidia-a10g', 'x-compute-characters': '72', 'x-accel-buffering': 'no', 'Access-Control-Allow-Origin': '*', 'Vary': 'origin', 'Vary': 'access-control-request-method', 'Vary': 'access-control-request-headers', 'Transfer-Encoding': 'chunked', 'Date': 'Thu, 11 Apr 2024 22:43:33 GMT')>\n"

View File

@ -0,0 +1,20 @@
{
"choices": [
{
"finish_reason": "length",
"index": 0,
"logprobs": null,
"text": " PR for flake8"
}
],
"created": 1712875413,
"id": "",
"model": "TinyLlama/TinyLlama-1.1B-Chat-v1.0",
"object": "text_completion",
"system_fingerprint": "1.4.5-native",
"usage": {
"completion_tokens": 5,
"prompt_tokens": 6,
"total_tokens": 11
}
}

View File

@ -0,0 +1,103 @@
import pytest
import requests
import json
from aiohttp import ClientSession
@pytest.fixture(scope="module")
def flash_llama_completion_handle(launcher):
with launcher(
"TinyLlama/TinyLlama-1.1B-Chat-v1.0",
) as handle:
yield handle
@pytest.fixture(scope="module")
async def flash_llama_completion(flash_llama_completion_handle):
await flash_llama_completion_handle.health(300)
return flash_llama_completion_handle.client
# NOTE: since `v1/completions` is a deprecated inferface/endpoint we do not provide a convience
# method for it. Instead, we use the `requests` library to make the HTTP request directly.
def test_flash_llama_completion_single_prompt(
flash_llama_completion, response_snapshot
):
response = requests.post(
f"{flash_llama_completion.base_url}/v1/completions",
json={
"model": "tgi",
"prompt": "Say this is a test",
"max_tokens": 5,
"seed": 0,
},
headers=flash_llama_completion.headers,
stream=False,
)
response = response.json()
assert len(response["choices"]) == 1
response == response_snapshot
def test_flash_llama_completion_many_prompts(flash_llama_completion, response_snapshot):
response = requests.post(
f"{flash_llama_completion.base_url}/v1/completions",
json={
"model": "tgi",
"prompt": ["Say", "this", "is", "a"],
"max_tokens": 10,
"seed": 0,
},
headers=flash_llama_completion.headers,
stream=False,
)
response = response.json()
assert len(response["choices"]) == 4
all_indexes = [choice["index"] for choice in response["choices"]]
all_indexes.sort()
assert all_indexes == [0, 1, 2, 3]
response == response_snapshot
async def test_flash_llama_completion_many_prompts_stream(
flash_llama_completion, response_snapshot
):
request = {
"model": "tgi",
"prompt": [
"What color is the sky?",
"Is water wet?",
"What is the capital of France?",
"def mai",
],
"max_tokens": 10,
"seed": 0,
"stream": True,
}
url = f"{flash_llama_completion.base_url}/v1/completions"
async with ClientSession(headers=flash_llama_completion.headers) as session:
async with session.post(url, json=request) as response:
# iterate over the stream
async for chunk in response.content.iter_any():
# remove "data:"
chunk = chunk.decode().split("\n\n")
# remove "data:" if present
chunk = [c.replace("data:", "") for c in chunk]
# remove empty strings
chunk = [c for c in chunk if c]
# parse json
chunk = [json.loads(c) for c in chunk]
for c in chunk:
assert "choices" in c
assert 0 <= c["choices"][0]["index"] <= 4
assert response.status == 200
response == response_snapshot

View File

@ -414,6 +414,10 @@ struct Args {
/// Display a lot of information about your runtime environment
#[clap(long, short, action)]
env: bool,
/// Control the maximum number of inputs that a client can send in a single request
#[clap(default_value = "4", long, env)]
max_client_batch_size: usize,
}
#[derive(Debug)]
@ -1078,6 +1082,8 @@ fn spawn_webserver(
// Start webserver
tracing::info!("Starting Webserver");
let mut router_args = vec![
"--max-client-batch-size".to_string(),
args.max_client_batch_size.to_string(),
"--max-concurrent-requests".to_string(),
args.max_concurrent_requests.to_string(),
"--max-best-of".to_string(),

View File

@ -155,6 +155,8 @@ pub struct Info {
pub max_batch_size: Option<usize>,
#[schema(example = "2")]
pub validation_workers: usize,
#[schema(example = "32")]
pub max_client_batch_size: usize,
/// Router Info
#[schema(example = "0.5.0")]
pub version: &'static str,
@ -280,6 +282,34 @@ fn default_parameters() -> GenerateParameters {
}
}
mod prompt_serde {
use serde::{self, Deserialize, Deserializer};
use serde_json::Value;
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
where
D: Deserializer<'de>,
{
let value = Value::deserialize(deserializer)?;
match value {
Value::String(s) => Ok(vec![s]),
Value::Array(arr) if arr.is_empty() => Err(serde::de::Error::custom(
"Empty array detected. Do not use an empty array for the prompt.",
)),
Value::Array(arr) => arr
.iter()
.map(|v| match v {
Value::String(s) => Ok(s.to_owned()),
_ => Err(serde::de::Error::custom("Expected a string")),
})
.collect(),
_ => Err(serde::de::Error::custom(
"Expected a string or an array of strings",
)),
}
}
}
#[derive(Clone, Deserialize, Serialize, ToSchema, Debug)]
pub struct CompletionRequest {
/// UNUSED
@ -289,7 +319,8 @@ pub struct CompletionRequest {
/// The prompt to generate completions for.
#[schema(example = "What is Deep Learning?")]
pub prompt: String,
#[serde(deserialize_with = "prompt_serde::deserialize")]
pub prompt: Vec<String>,
/// The maximum number of tokens that can be generated in the chat completion.
#[serde(default)]
@ -925,6 +956,20 @@ pub(crate) struct Details {
pub top_tokens: Vec<Vec<Token>>,
}
impl Default for Details {
fn default() -> Self {
Self {
finish_reason: FinishReason::Length,
generated_tokens: 0,
seed: None,
prefill: Vec::new(),
tokens: Vec::new(),
best_of_sequences: None,
top_tokens: Vec::new(),
}
}
}
#[derive(Serialize, ToSchema)]
pub(crate) struct GenerateResponse {
#[schema(example = "test")]

View File

@ -78,6 +78,8 @@ struct Args {
messages_api_enabled: bool,
#[clap(long, env, default_value_t = false)]
disable_grammar_support: bool,
#[clap(default_value = "4", long, env)]
max_client_batch_size: usize,
}
#[tokio::main]
@ -112,6 +114,7 @@ async fn main() -> Result<(), RouterError> {
ngrok_edge,
messages_api_enabled,
disable_grammar_support,
max_client_batch_size,
} = args;
// Launch Tokio runtime
@ -393,6 +396,7 @@ async fn main() -> Result<(), RouterError> {
tokenizer_config,
messages_api_enabled,
disable_grammar_support,
max_client_batch_size,
)
.await?;
Ok(())

View File

@ -16,6 +16,7 @@ use crate::{
CompletionRequest, DeltaToolCall, Function, Tool, VertexRequest, VertexResponse,
};
use crate::{FunctionDefinition, FunctionRef, FunctionsMap, Properties, ToolCall, ToolType, Tools};
use async_stream::__private::AsyncStream;
use axum::extract::Extension;
use axum::http::{HeaderMap, Method, StatusCode};
use axum::response::sse::{Event, KeepAlive, Sse};
@ -23,8 +24,8 @@ use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::{http, Json, Router};
use axum_tracing_opentelemetry::middleware::OtelAxumLayer;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::Stream;
use futures::TryStreamExt;
use metrics_exporter_prometheus::{Matcher, PrometheusBuilder, PrometheusHandle};
@ -36,7 +37,9 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use text_generation_client::{ShardInfo, ShardedClient};
use tokenizers::Tokenizer;
use tokio::select;
use tokio::signal;
use tokio::sync::oneshot;
use tokio::time::Instant;
use tower_http::cors::{AllowOrigin, CorsLayer};
use tracing::{info_span, instrument, Instrument};
@ -597,100 +600,283 @@ async fn completions(
));
}
// build the request passing some parameters
let generate_request = GenerateRequest {
inputs: req.prompt.to_string(),
parameters: GenerateParameters {
best_of: None,
temperature: req.temperature,
repetition_penalty: req.repetition_penalty,
frequency_penalty: req.frequency_penalty,
top_k: None,
top_p: req.top_p,
typical_p: None,
do_sample: true,
max_new_tokens,
return_full_text: None,
stop: Vec::new(),
truncate: None,
watermark: false,
details: true,
decoder_input_details: !stream,
seed,
top_n_tokens: None,
grammar: None,
},
};
if req.prompt.len() > info.max_client_batch_size {
metrics::increment_counter!("tgi_request_failure", "err" => "validation");
return Err((
StatusCode::UNPROCESSABLE_ENTITY,
Json(ErrorResponse {
error: format!(
"Number of prompts exceeds the maximum allowed batch size of {}",
info.max_client_batch_size
),
error_type: "batch size exceeded".to_string(),
}),
));
}
let generate_requests: Vec<GenerateRequest> = req
.prompt
.iter()
.map(|prompt| GenerateRequest {
inputs: prompt.to_string(),
parameters: GenerateParameters {
best_of: None,
temperature: req.temperature,
repetition_penalty: req.repetition_penalty,
frequency_penalty: req.frequency_penalty,
top_k: None,
top_p: req.top_p,
typical_p: None,
do_sample: true,
max_new_tokens,
return_full_text: None,
stop: Vec::new(),
truncate: None,
watermark: false,
details: true,
decoder_input_details: !stream,
seed,
top_n_tokens: None,
grammar: None,
},
})
.collect();
let mut x_compute_type = None;
let mut x_compute_characters = 0u32;
let mut x_accel_buffering = None;
if stream {
let on_message_callback = move |stream_token: StreamResponse| {
let event = Event::default();
let mut response_streams = FuturesOrdered::new();
for (index, generate_request) in generate_requests.into_iter().enumerate() {
let model_id = info.model_id.clone();
let system_fingerprint =
format!("{}-{}", info.version, info.docker_label.unwrap_or("native"));
let infer_clone = infer.clone();
let compute_type_clone = compute_type.clone();
let current_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_else(|_| std::time::Duration::from_secs(0))
.as_secs();
// Create a future for each generate_stream_internal call.
let generate_future = async move {
let on_message_callback = move |stream_token: StreamResponse| {
let event = Event::default();
event
.json_data(CompletionCompleteChunk {
id: "".to_string(),
object: "text_completion".to_string(),
created: current_time,
let current_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_else(|_| std::time::Duration::from_secs(0))
.as_secs();
choices: vec![CompletionComplete {
finish_reason: "".to_string(),
index: 0,
logprobs: None,
text: stream_token.token.text,
}],
event
.json_data(CompletionCompleteChunk {
id: "".to_string(),
object: "text_completion".to_string(),
created: current_time,
model: info.model_id.clone(),
system_fingerprint: format!(
"{}-{}",
info.version,
info.docker_label.unwrap_or("native")
),
})
.map_or_else(
|e| {
println!("Failed to serialize ChatCompletionChunk: {:?}", e);
Event::default()
},
|data| data,
)
choices: vec![CompletionComplete {
finish_reason: "".to_string(),
index: index as u32,
logprobs: None,
text: stream_token.token.text,
}],
model: model_id.clone(),
system_fingerprint: system_fingerprint.clone(),
})
.map_or_else(|_e| Event::default(), |data| data)
};
let (header_tx, header_rx) = oneshot::channel();
let (sse_tx, sse_rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(async move {
let (header_map, sse) = generate_stream_internal(
infer_clone.clone(),
compute_type_clone.clone(),
Json(generate_request),
on_message_callback,
)
.await;
// send and dont wait for response
let _ = header_tx.send(header_map);
// pin an emit messages to the sse_tx
let mut sse = Box::pin(sse);
while let Some(event) = sse.next().await {
sse_tx.send(event).expect("Failed to send event");
}
});
(header_rx, sse_rx)
};
response_streams.push_back(generate_future);
}
let mut all_rxs = vec![];
while let Some((header_rx, sse_rx)) = response_streams.next().await {
all_rxs.push(sse_rx);
// get the headers from the first response of each stream
let headers = header_rx.await.expect("Failed to get headers");
if x_compute_type.is_none() {
x_compute_type = headers
.get("x-compute-type")
.and_then(|v| v.to_str().ok())
.map(|v| v.to_string());
x_accel_buffering = headers
.get("x-accel-buffering")
.and_then(|v| v.to_str().ok())
.map(|v| v.to_string());
}
x_compute_characters += headers
.get("x-compute-characters")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse().ok())
.unwrap_or(0);
}
let mut headers = HeaderMap::new();
if let Some(x_compute_type) = x_compute_type {
headers.insert("x-compute-type", x_compute_type.parse().unwrap());
}
headers.insert("x-compute-characters", x_compute_characters.into());
if let Some(x_accel_buffering) = x_accel_buffering {
headers.insert("x-accel-buffering", x_accel_buffering.parse().unwrap());
}
// now sink the sse streams into a single stream and remove the ones that are done
let stream: AsyncStream<Result<Event, Infallible>, _> = async_stream::stream! {
loop {
let mut i = 0;
while i < all_rxs.len() {
let rx = &mut all_rxs[i];
select! {
Some(event) = rx.recv() => {
yield event;
}
else => {
all_rxs.remove(i);
continue; // skip the increment to handle the next element at the same index
}
}
i += 1; // only increment when no element was removed
}
if all_rxs.is_empty() {
break;
}
}
};
let (headers, response_stream) = generate_stream_internal(
infer,
compute_type,
Json(generate_request),
on_message_callback,
)
.await;
let sse = Sse::new(response_stream).keep_alive(KeepAlive::default());
let sse = Sse::new(stream).keep_alive(KeepAlive::default());
Ok((headers, sse).into_response())
} else {
let (headers, Json(generation)) = generate(
Extension(infer),
Extension(compute_type),
Json(generate_request),
)
.await?;
let current_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_else(|_| std::time::Duration::from_secs(0))
.as_secs();
let details = generation.details.ok_or((
// this should never happen but handle if details are missing unexpectedly
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: "No details in generation".to_string(),
error_type: "no details".to_string(),
}),
))?;
let responses = FuturesUnordered::new();
for (index, generate_request) in generate_requests.into_iter().enumerate() {
let infer_clone = infer.clone();
let compute_type_clone = compute_type.clone();
let response_future = async move {
let result = generate(
Extension(infer_clone),
Extension(compute_type_clone),
Json(generate_request),
)
.await;
result.map(|(headers, generation)| (index, headers, generation))
};
responses.push(response_future);
}
let generate_responses = responses.try_collect::<Vec<_>>().await?;
let mut prompt_tokens = 0u32;
let mut completion_tokens = 0u32;
let mut total_tokens = 0u32;
let mut x_compute_time = 0u32;
let mut x_total_time = 0u32;
let mut x_validation_time = 0u32;
let mut x_queue_time = 0u32;
let mut x_inference_time = 0u32;
let mut x_time_per_token = 0u32;
let mut x_prompt_tokens = 0u32;
let mut x_generated_tokens = 0u32;
let choices = generate_responses
.into_iter()
.map(|(index, headers, Json(generation))| {
let details = generation.details.ok_or((
// this should never happen but handle if details are missing unexpectedly
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: "No details in generation".to_string(),
error_type: "no details".to_string(),
}),
))?;
if x_compute_type.is_none() {
x_compute_type = headers
.get("x-compute-type")
.and_then(|v| v.to_str().ok())
.map(|v| v.to_string());
}
// accumulate headers and usage from each response
x_compute_time += headers
.get("x-compute-time")
.and_then(|v| v.to_str().ok()?.parse().ok())
.unwrap_or(0);
x_compute_characters += headers
.get("x-compute-characters")
.and_then(|v| v.to_str().ok()?.parse().ok())
.unwrap_or(0);
x_total_time += headers
.get("x-total-time")
.and_then(|v| v.to_str().ok()?.parse().ok())
.unwrap_or(0);
x_validation_time += headers
.get("x-validation-time")
.and_then(|v| v.to_str().ok()?.parse().ok())
.unwrap_or(0);
x_queue_time += headers
.get("x-queue-time")
.and_then(|v| v.to_str().ok()?.parse().ok())
.unwrap_or(0);
x_inference_time += headers
.get("x-inference-time")
.and_then(|v| v.to_str().ok()?.parse().ok())
.unwrap_or(0);
x_time_per_token += headers
.get("x-time-per-token")
.and_then(|v| v.to_str().ok()?.parse().ok())
.unwrap_or(0);
x_prompt_tokens += headers
.get("x-prompt-tokens")
.and_then(|v| v.to_str().ok()?.parse().ok())
.unwrap_or(0);
x_generated_tokens += headers
.get("x-generated-tokens")
.and_then(|v| v.to_str().ok()?.parse().ok())
.unwrap_or(0);
prompt_tokens += details.prefill.len() as u32;
completion_tokens += details.generated_tokens;
total_tokens += details.prefill.len() as u32 + details.generated_tokens;
Ok(CompletionComplete {
finish_reason: details.finish_reason.to_string(),
index: index as u32,
logprobs: None,
text: generation.generated_text,
})
})
.collect::<Result<Vec<_>, _>>()
.map_err(|(status, Json(err))| (status, Json(err)))?;
let response = Completion {
id: "".to_string(),
@ -702,19 +888,30 @@ async fn completions(
info.version,
info.docker_label.unwrap_or("native")
),
choices: vec![CompletionComplete {
finish_reason: details.finish_reason.to_string(),
index: 0,
logprobs: None,
text: generation.generated_text,
}],
choices,
usage: Usage {
prompt_tokens: details.prefill.len() as u32,
completion_tokens: details.generated_tokens,
total_tokens: details.prefill.len() as u32 + details.generated_tokens,
prompt_tokens,
completion_tokens,
total_tokens,
},
};
// headers similar to `generate` but aggregated
let mut headers = HeaderMap::new();
if let Some(x_compute_type) = x_compute_type {
headers.insert("x-compute-type", x_compute_type.parse().unwrap());
}
headers.insert("x-compute-characters", x_compute_characters.into());
headers.insert("x-total-time", x_total_time.into());
headers.insert("x-validation-time", x_validation_time.into());
headers.insert("x-queue-time", x_queue_time.into());
headers.insert("x-inference-time", x_inference_time.into());
headers.insert("x-time-per-token", x_time_per_token.into());
headers.insert("x-prompt-tokens", x_prompt_tokens.into());
headers.insert("x-generated-tokens", x_generated_tokens.into());
if let Some(x_accel_buffering) = x_accel_buffering {
headers.insert("x-accel-buffering", x_accel_buffering.parse().unwrap());
}
Ok((headers, Json(response)).into_response())
}
}
@ -1166,6 +1363,7 @@ pub async fn run(
tokenizer_config: HubTokenizerConfig,
messages_api_enabled: bool,
grammar_support: bool,
max_client_batch_size: usize,
) -> Result<(), axum::BoxError> {
// OpenAPI documentation
#[derive(OpenApi)]
@ -1340,6 +1538,7 @@ pub async fn run(
max_waiting_tokens,
max_batch_size,
validation_workers,
max_client_batch_size,
version: env!("CARGO_PKG_VERSION"),
sha: option_env!("VERGEN_GIT_SHA"),
docker_label: option_env!("DOCKER_LABEL"),